BDM-MIRI - Big Data Management

Jose Antonio Lorencio Abril

Spring 2023

PIC

Professor: Alberto Abelló

Student e-mail: jose.antonio.lorencio@estudiantat..upc.edu

This is a summary of the course Big Data Management taught at the Universitat Politècnica de Catalunya by Professor Alberto Abelló in the academic year 22/23. Most of the content of this document is adapted from the course notes by Abelló and Nadal, [1], so I won’t be citing it all the time. Other references will be provided when used.

Contents

 1 Introduction to Big Data
  1.1 Recognise the relevance of data driven decision making
  1.2 Identify the three high level categories of analytical tools
  1.3 Identify the two main sources of Big Data
  1.4 Give a definition of Big Data
  1.5 Compare traditional data warehousing against Big Data management
  1.6 Distinguish descriptive, predictive and prescriptive analysis
  1.7 Explain the novelty of Cloud Computing
  1.8 Justify the benefits of Cloud Computing
  1.9 Explain the link between Big Data and Cloud Computing
  1.10 Distinguish the main four service levels in Cloud Computing
 2 Big Data Design
  2.1 Define the impedance mismatch
  2.2 Identify applications handling different kinds of data
  2.3 Name four different kinds of NOSQL systems
  2.4 Explain three consequences of schema variability
  2.5 Explain the consequences of physical independence
  2.6 Explain the two dimensions to classify NOSQL systems according to how they manage the schema
  2.7 Explain the three elements of the RUM conjecture
  2.8 Justify the need of polyglot persistence
  2.9 Decide whether two NOSQL designs have more or less explicit/fix schema
  2.10 Given a relatively small UML conceptual diagram, translate it into a logical representation of data considering flexible schema representation
 3 Distributed Data Management
  3.1 Give a definition of Distributed System
  3.2 Enumerate the six challenges of a Distributed System
  3.3 Give a definition of Distributed Database
  3.4 Explain the different transparency layers in DDBMS
  3.5 Identify the requirements that distribution imposes on the ANSI/SPARC architecture
  3.6 Draw a classical reference functional architecture for DDBMS
  3.7 Enumerate the eight main features of Cloud Databases
  3.8 Explain the difficulties of Cloud Database providers to have multiple tenants
  3.9 Enumerate the four main problems tenants/users need to tackle in Cloud Databases
  3.10 Distinguish the cost of sequential and random access
  3.11 Explain the difference between the cost of sequential and random access
  3.12 Distinguish vertical and horizontal fragmentation
  3.13 Recognize the complexity and benefits of data allocation
  3.14 Explain the benefits of replication
  3.15 Discuss the alternatives of a distributed catalog
  3.16 Decide when a fragmentation strategy is correct
 4 Distributed Data Processing
  4.1 Explain the CAP theorem
  4.2 Identify the 3 configuration alternatives given by the CAP theorem
  4.3 Explain the 4 synchronization protocols we can have
  4.4 Explain what eventual consistency means
  4.5 Enumerate the phases of distributed query processing
  4.6 Explain the difference between data shipping and query shipping
  4.7 Explain the meaning of ’reconstruction’ and ’reduction’ in syntactic optimization
  4.8 Explain the purpose of the ’exchange’ operator in physical optimization
  4.9 Enumerate the 4 different cost factors in distributed query processing
  4.10 Distinguish between response time and query time
  4.11 Explain the different kinds of parallelism
  4.12 Identify the impact of fragmentation in intra-operator parallelism
  4.13 Explain the impact of tree topologies (i.e. linear and bushy) in inter-operator parallelism
  4.14 Explain the limits of scalability
  4.15 Given the overall number of machines in the cluster, identify the consistency problems that arise depending on the configuration of the number of required replicas read and written to confirm the corresponding operations
  4.16 Given a parallel system and a workload, find the number of machines maximizing throughput
  4.17 Estimate the cost of a distributed query
  4.18 Given a query and a database design, recognize the difficulties and opportunities behind distributed query processing
 5 Hadoop Distributed File System (HDFS)
  5.1 Recognize the need of persistent storage
  5.2 Enumerate the design goals of GFS
  5.3 Explain the structural components of HDFS
  5.4 Name three file formats in HDFS and explain their differences
  5.5 Recognize the importance of choosing the file format depending on the workload
  5.6 Explain the actions of the coordinator node in front of chunkserver failure
  5.7 Explain a mechanism to avoid overloading the master node in HDFS
  5.8 Explain how data is partitioned and replicated in HDFS
  5.9 Transaction management
  5.10 Recognize the relevance of sequential read
  5.11 Choose the format for an HDFS file based on heuristics
  5.12 Estimate the data retrieved by scan, projection and selection operations in SequenceFile, Avro and Parquet
 6 HBase
  6.1 Give the definition of the BigTable data model
  6.2 Explain what a map structure is
  6.3 Explain the difference between a Key-Value and a Wide-Column store
  6.4 Enumerate the main schema elements of HBase
  6.5 Explain the main operations available of HBase
  6.6 Enumerate the main functional components of HBase
  6.7 Explain the role of the different functional components in HBase
  6.8 Explain the tree structure of data in HBase
  6.9 Explain the 3 basic algorithms of HBase
  6.10 Explain the main components and behavior of an LSM-tree
  6.11 Compare a distributed tree against a hash structure of data
  6.12 Justify the need of dynamic hashing
  6.13 Explain the structure of the HBase catalog
  6.14 Explain the mistake compensation mechanism of the cache in HBase client
  6.15 Enumerate the ACID guarantees provided by HBase
  6.16 Explain the execution flow of an HBase query both at global and local levels
  6.17 Given few queries, define the best logical structure of a table considering its physical implications in terms of performance
  6.18 Given the data in two leafs of a Log-Structured Merge-tree (LSM-tree), merge them
  6.19 Given the current structure of a Linear Hash, modify it according to insertions potentially adding buckets
  6.20 Given the current structure of a Consistent Hash, modify it in case of adding a bucket
  6.21 Calculate the number of round trips needed in case of mistake compensation of the tree metadata
  6.22 Use HBase shell to create a table and access it
  6.23 Use HBase API to create a table and access it
 7 Document Stores: MongoDB
  7.1 Explain the main difference between key-value and document stores
  7.2 Explain the main resemblances and differences between XML and JSON documents
  7.3 Explain the design principle of documents
  7.4 Name 3 consequences of the design principle of a document store
  7.5 Explain the difference between relational foreign keys and document references
  7.6 Exemplify 6 alternatives in deciding the structure of a document
  7.7 Explain the difference between JSON and BJSON
  7.8 Name the main functional components of MongoDB architecture
  7.9 Explain the role of ’mongos’ in query processing
  7.10 Explain what a replica set is in MongoDB
  7.11 Name the three storage engines in MongoDB
  7.12 Explain what shard and chunk are in MongoDB
  7.13 Explain the two horizontal fragmentation mechanisms in MongoDB
  7.14 Explain how the catalog works in MongoDB
  7.15 Identify the characteristics of the replica synchronization management in MongoDB
  7.16 Explain how primary copy failure is managed in MongoDB
  7.17 Name the three query mechanisms of MongoDB
  7.18 Explain the query optimization mechanism of MongoDB
  7.19 Given two alternative structures of a document, explain the performance impact of the choice in a given setting
  7.20 Simulate splitting and migration of chunks in MongoDB
  7.21 Configure the number of replicas needed for confirmation on both reading and writing in a given scenario
  7.22 Perform some queries on MongoDB through the shell and aggregation framework
  7.23 Compare the access costs given different document designs
  7.24 Compare the access costs with different indexing strategies (i.e. hash and range based)
  7.25 Compare the access costs with different sharding distributions (i.e. balanced and unbalanced)
 8 MapReduce I
  8.1 Enumerate several use cases of MapReduce
  8.2 Explain 6 benefits of using MapReduce
  8.3 Describe what the MapReduce is in the context of a DDBMS
  8.4 Recognize the signature of Map and Reduce functions
  8.5 Explain the phases of a MapReduce operation
  8.6 Justify to which extent MapReduce is generic
  8.7 Simulate the execution of a simple MapReduce algorithm from the user (agnostic of implementation details) perspective
  8.8 Identify the usefulness of MapReduce in a given use case
  8.9 Define the key in the output of the map for a simple problem
  8.10 Provide the pseudo-code of map and reduce functions for a simple problem
 9 MapReduce II
  9.1 Enumerate the different kind of processes in Hadoop MapReduce
  9.2 Draw the hierarchy of Hadoop MapReduce objects
  9.3 Explain the information kept in the Hadoop MapReduce coordinator node
  9.4 Explain how to decide the number of mappers and reducers
  9.5 Explain the fault tolerance mechanisms in Hadoop MapReduce
  9.6 Identify query shipping and data shipping in MapReduce
  9.7 Explain the effect of using the combine function in MapReduce
  9.8 Identify the synchronization barriers of MapReduce
  9.9 Explain the main problems and limitations of Hadoop MapReduce
  9.10 Apply the different steps of a MapReduce execution at the implementation level
  9.11 Decide on the use of the combine function
 10 Spark I
  10.1 Name the main Spark contributions and characteristics
  10.2 Compare MapReduce and Spark
  10.3 Define a dataframe
  10.4 Distinguish dataframe from relation and matrix
  10.5 Distinguish Spark and Pandas dataframe
  10.6 Enumerate some abstraction on top of Spark
  10.7 Provide the Spark pseudo-code for a simple problem using dataframes
 11 Spark II
  11.1 Define RDD
  11.2 Distinguish between Base RDD and Pair RDD
  11.3 Distinguish between transformations and actions
  11.4 Explain available transformations
  11.5 Explain available actions
  11.6 Name the main Spark runtime components
  11.7 Explain how to manage parallelism in Spark
  11.8 Explain how recoverability works in Spark
  11.9 Distinguish between narrow and wide dependencies
  11.10 Name the two mechanisms to share variables
  11.11 Provide the Spark pseudo-code for a simple problem using RDDs
 12 Stream Data Management
  12.1 Define a data stream
  12.2 Distinguish the two kinds of stream management systems
  12.3 Recognize the importance of stream management
  12.4 Enumerate the most relevant chracteristics of streams
  12.5 Explain to which extent a DBMS can manage streams
  12.6 Name 10 differences between DBMS and SPE
  12.7 Characterize the kinds of queries in an SPE
  12.8 Explain the two parameters of a sliding window
  12.9 Explain the three stream architectural patterns
  12.10 Explain the goals of Spark streaming architecture
  12.11 Draw the architecture of Spark streaming
  12.12 Identify the need of a stream ingestion pattern
  12.13 Identify the need of a near-real time processing pattern
  12.14 Identify the kind of message exchange pattern
  12.15 Simulate the mesh-join algorithm
  12.16 Estimate the cost of the mesh-join algorithm
  12.17 Use windowing transformations in Spark streaming
 13 Data Stream Analysis
  13.1 Explain the difference between generic one-pass algorithms and stream processing
  13.2 Name the two challenges of stream processing
  13.3 Name two solutions to limited processing capacity
  13.4 Name three solutions to limited memory capacity
  13.5 Decide the probability of keeping a new element or removing an old one from memory to keep equi-probability on load shedding
  13.6 Decide the parameters of the hash function to get a representative result on load shedding
  13.7 Decide the optimum number of hash functions in a Bloom filter
  13.8 Approximate the probability of false positives in a Bloom filter
  13.9 Calculate the weighted average of an attribute considering and exponentially decaying function
  13.10 Decide if heavy hitters will show false positives
 14 Big Data Architectures
  14.1 Explain the problem of a spaguetti architecture
  14.2 Explain the need of the Lambda Architecture
  14.3 Explain the difference between Kappa and Lambda Architectures
  14.4 Justify the need of a Data Lake
  14.5 Identify the difficulties of a Data Lake
  14.6 Explain the need of each component in the Bolster Architecture
  14.7 Map the components of the Bolster Architecture to the RDBMS Architecture
  14.8 Given a use case, define its software architecture
 15 In-Memory Columnar Databases (New Relational Architecture)
  15.1 Justify the viability of in-memory databases
  15.2 Explain the principles of NUMA architecture
  15.3 Enumerate 3 techniques to optimize cache usage
  15.4 Give 4 arguments supporting columnar storage
  15.5 Explain 3 classical optimization techniques in RDBMS related to column storage
  15.6 Sketch the functional architecture of SanssouciuDB
  15.7 Explain 4 optimizations implemented in SanssouciuDB to improve data access
  15.8 Explain how to choose the best layout
  15.9 Identify the difference between column-stores and NOSQL related to transactions
  15.10 Explain 3 difficulties of pipelining in column-stores
  15.11 Explain 3 problems to implement parallelism in in-memory column-stores
  15.12 Explain 5 query optimization techniques specific to columnar storage
  15.13 Given a data setting, justify the choice of either row or column storage
  15.14 Given the data in a column, use run-length encoding with dictionary to compress it

List of Figures

Business Intelligence Cycle.
Big Data Cycle.
Extended ANSI/SPARC architecture.
Extended ANSI/SPARC architecture with distribution.
Functional architecture of a centralized DBMS.
Functional architecture of a DDBMS.
Replica synchronization alternatives. Source: [2].
HDFS Architecture. Source: HDFS-Architecture.
Memory caching diagram. Source: Hazelcast Glossary of Terms.
10 Window parameters visualization.

List of Tables

Pros and cons of each data format of HDFS.
Comparison of data formats of HDFS.
A StoreFile.

List of Algorithms

1 Introduction to Big Data

1.1 Recognise the relevance of data driven decision making

Data driven decision making is the strategy of using data to make decisions, in order to improve the chances of obtaining a positive outcome. It has been gaining importance in the past years, mainly because the data generation rate is increasing rapidly, allowing greater analyses for those who are able to leverage all this data.

The ability to collect, store, combine and analyze relevant data enables companies to gain a competitive advantage over their competitors which are not able to take on these task.

In a nutshell, it is the confluence of three major socio-economic and technological trends that makes data driven innovation a new phenomenon:

1.2 Identify the three high level categories of analytical tools

Business Intelligence (BI) is the concept of using dashboard to represent the status and evolution of companies, using data from the different applications used by the production systems of the company, which needs to be processed with ETL (Extract, Transform, Load) pipelines into a Data Warehouse. This data is then modelled into data cubes, that are queried with OLAP (OnLine Analytic Processing) purposes. The analytical tools that this setup allows are three:

  1. Static generation of reports.

  2. Dynamic (dis)aggregation and navigation by means of OLAP operations.

  3. Inference of hidden patterns or trends with data mining tools.

1.3 Identify the two main sources of Big Data

The two main sources of Big Data are:

1.4 Give a definition of Big Data

Big Data is a natural evolution of Business Intelligence, and inherits its ultimate goal of transforming raw data into valuable knowledge, and it can be characterized in terms of the five V’s:

1.5 Compare traditional data warehousing against Big Data management

In traditional business intelligence, data from different sources inside the company is ETL-processed into the data warehouse, which can then be analyzed using the three types of analyses we’ve seen (Reports, OLAP, DM), in order to extract useful information that ultimately affects the strategy of the company. This is summarized in Figure 1. As highlighted in the figure, the data warehousing process encompasses the ETL processes and the Data Warehouse design and maintenance.


PIC

Figure 1: Business Intelligence Cycle.


In the context of big data, the focus is shifted, from analyzing data from just inside sources, to data from all types of heterogeneous sources. In this setup, instead of doing an ETL process, the data is collected, through the process of ingestion, and stored into a Data Lake (from which analysts would extract data and perform all necessary transformations a posteriori) or a Polystore (which is a DBMS built on top of different other technologies, to be able to cope with heterogeneous data). Whatever the storing decision, Big Data Analytics are then done on this data, differenciating:

This process is depicted in Figure 2. In this diagram, we see that the Big Data Management consists of the task of ingestion, together with the design and maintenance of the Data Lake / Polystore.


PIC

Figure 2: Big Data Cycle.


Thus, the differences are:

1.6 Distinguish descriptive, predictive and prescriptive analysis

1.7 Explain the novelty of Cloud Computing

The novelty of cloud computing is the same as when electricity shifted from being generated by each company to be centrally generated, benefiting from scale economies and improving the efficiency of the electricity generation. In the case of Cloud Computing, the shift is from companies having their own hardware and software, to an environment in which these resources are offered by a third company, which leverages again the economies of scale and the possibility to allocate resources when needed, increasing the overall efficiency of the tech industries and reducing the costs of each company, as they now don’t need to buy expensive pieces of hardware and software, maintain them, etc.

1.8 Justify the benefits of Cloud Computing

1.9 Explain the link between Big Data and Cloud Computing

Cloud computing and big data are closely related, and in many ways, cloud computing has enabled the growth and adoption of big data technologies.

One of the main advantages of cloud computing is its ability to provide flexible and scalable computing resources on demand. This is especially important for big data, which requires significant computing power to process and analyze large volumes of data. Cloud computing allows organizations to easily spin up large-scale computing clusters and storage systems to handle big data workloads, without the need to invest in expensive on-premises infrastructure.

In addition to providing scalable computing resources, cloud computing also offers a wide range of data storage and processing services that can be used for big data workloads. Cloud providers offer a variety of data storage services, such as object storage, file storage, and database services, that can be used to store and manage large volumes of data. Cloud providers also offer big data processing services, such as Apache Hadoop, Apache Spark, and machine learning tools, which can be used to analyze and extract insights from big data.

Cloud computing also provides the ability to easily integrate and share data between different systems and applications, both within an organization and with external partners. This is important for big data, which often requires data from multiple sources to be combined and analyzed to gain insights.

Overall, cloud computing has played a key role in enabling the growth and adoption of big data technologies, by providing flexible and scalable computing resources, a wide range of data storage and processing services, and the ability to easily integrate and share data between different systems and applications.

1.10 Distinguish the main four service levels in Cloud Computing

The main four service levels are:

But there are more services offered by Cloud Computing:

2 Big Data Design

2.1 Define the impedance mismatch

Impedance mismatch often arises when data is passed between different layers of an application, such as between the front-end user interface and the back-end database, or between different applications that need to exchange data. The data structures used in each layer or system may be different, which can cause issues with data mapping, performance, and scalability.

For example, if a front-end application requires data that is stored in a relational database, the application may need to perform complex queries to retrieve and transform the data into a format that can be used by the user interface. This can lead to performance issues and increased complexity in the application code. Similarly, if different applications or services use different data formats or structures, it can be difficult to exchange data between them, which can lead to integration issues and increased development time.

To address impedance mismatch, software developers often use techniques such as object-relational mapping (ORM) to map data between different layers of an application, or use standard data formats such as JSON or XML to enable data exchange between different systems. These techniques can help to simplify data mapping, improve performance, and increase the scalability of the system.

2.2 Identify applications handling different kinds of data

Note that many applications use multiple types of data models, depending on the nature of the data and the requirements of the application. For example, a social network might use a graph database to store social connections, a column-family database to store user data, and a key-value database to cache frequently accessed data.

2.3 Name four different kinds of NOSQL systems

2.4 Explain three consequences of schema variability

Schema variability refers to the dynamic and flexible nature of data models in NoSQL databases. Unlike relational databases, NoSQL databases allow for the schema to be flexible and adaptable, which means that the data structure can evolve over time without requiring changes to the database schema. This allows for greater agility in data modeling, as it makes it easier to add or remove fields or change the data structure as needed.

Its three main consequences are:

Some more consequences:

2.5 Explain the consequences of physical independence

Physical independence is a key principle of the relational data model, which refers to the ability to change the physical storage of the data without affecting the logical structure of the data or the application programs that use the data. This means that the application should be able to access and manipulate the data without being aware of the underlying physical storage details, such as the storage medium or the location of the data.

The consequences of physical independence include:

Nonetheless, physical independence enhance the problem of the impedance mismatch, because if data is needed in a different form from how it is stored, it has to be transformed, introducing a computing overhead. If we store the data as needed for the application, this problem is reduced, but the physical independence can be lost.

2.6 Explain the two dimensions to classify NOSQL systems according to how they manage the schema

The schema can be explicit/implicit:

And it can be fixed/variable:

Note that this is not a strict classification, but rather two dimension ranges in which a certain schema can lie.

2.7 Explain the three elements of the RUM conjecture

The RUM conjecture suggests that in any database system, the overall performance can be characterized by a trade-off between the amount of memory used, the number of reads performed, and the number of updates performed. Specifically, the conjecture states that there is a fundamental asymmetry between reads and updates, and that the performance of the system is strongly influenced by the balance between these two operations. In general, the more reads a system performs, the more memory it requires, while the more updates it performs, the more it impacts the system’s overall performance.

The RAM conjecture has three main elements:

The RUM conjecture is often used to guide the design and optimization of database systems, as it provides a useful framework for understanding the trade-offs between different system parameters and performance metrics. By understanding the RUM trade-offs, database designers can make informed decisions about how to allocate resources, optimize queries, and balance the workload of the system.

2.8 Justify the need of polyglot persistence

A polyglot system is a system that uses multiple technologies, languages, and tools to solve a problem. In the context of data management, a polyglot system is one that uses multiple data storage technologies to store and manage data. For example, a polyglot system might use a combination of relational databases, NoSQL databases, and search engines to store different types of data.

Polyglot persistence is the practice of using multiple storage technologies to store different types of data within a single application. The idea is to choose the right tool for the job, and to use each technology to its fullest potential. For example, a polyglot system might use a NoSQL database to store unstructured data, a relational database to store structured data, and a search engine to provide full-text search capabilities.

There are several reasons why polyglot persistence is important:

In summary, polyglot persistence is a powerful approach to data management that allows you to use multiple storage technologies to store and manage different types of data within a single application. By adopting a polyglot approach, you can improve flexibility, performance, resilience, and future-proofing.

2.9 Decide whether two NOSQL designs have more or less explicit/fix schema

There are several factors that can be used to assess the flexibility and explicitness of a schema:

  1. Number of tables/collections: A schema with a large number of tables or collections is typically more explicit and less flexible than a schema with fewer tables or collections. This is because a large number of tables or collections often implies a more rigid structure, whereas a smaller number of tables or collections can allow for more flexibility.

  2. Number of columns/fields: A schema with a large number of columns or fields is typically more explicit and less flexible than a schema with fewer columns or fields. This is because a large number of columns or fields often implies a more rigid structure, whereas a smaller number of columns or fields can allow for more flexibility.

  3. Data types: A schema that uses a large number of data types is typically more explicit and less flexible than a schema that uses fewer data types. This is because a large number of data types often implies a more rigid structure, whereas a smaller number of data types can allow for more flexibility.

  4. Use of constraints: A schema that uses a large number of constraints (such as foreign keys or unique constraints) is typically more explicit and less flexible than a schema that uses fewer constraints. This is because constraints often imply a more rigid structure, whereas a schema with fewer constraints can allow for more flexibility.

  5. Use of inheritance: A schema that uses inheritance (such as table or collection inheritance) is typically more flexible and less explicit than a schema that does not use inheritance. This is because inheritance allows for more flexibility in the structure of the data, whereas a schema that does not use inheritance is typically more explicit in its structure.

Overall, a more explicit schema is one that has a more rigid structure, with more tables, fields, data types, and constraints, whereas a more flexible schema is one that has fewer tables, fields, data types, and constraints, and may use inheritance to provide more flexibility.

Some examples can be:

2.10 Given a relatively small UML conceptual diagram, translate it into a logical representation of data considering flexible schema representation

3 Distributed Data Management

3.1 Give a definition of Distributed System

A distributed system is a system whose components, located at networked computers, communicate and coordinate their actions only by passing messages.

3.2 Enumerate the six challenges of a Distributed System

The challenges of a distributed system are:

3.3 Give a definition of Distributed Database

A Distributed Database (DDB) is an integrated collection of databases that is physically distributed across sites in a computer network and a Distributed Database Management System (DDBMS) is the software system that manages a distributed database such that the distribution aspects are transparent to the users.

There are some terms worth detailing:

3.4 Explain the different transparency layers in DDBMS

In a DDBMS distribution transparency must be ensured, i.e., the system must guarantee data, network, fragmentation and replication transparency:

Note that all these transparency levels are incremental.

Note also that full transparency makes the management of distributed data very difficult, so it is widely accepted that data independence and network transparency are a must, but replication and/or fragmentation transparency might be relaxed to boost performance.

3.5 Identify the requirements that distribution imposes on the ANSI/SPARC architecture

The Extended ANSI/SPARC architecturewas designed to provide a comprehensive framework for organizing and managing complex database systems. The extended architecture includes the same three levels as the original ANSI/SPARC architecture, but it adds a fourth level, called the user level.

The four levels of the extended ANSI/SPARC architecture are:

This is summarized in Figure 3.


PIC

Figure 3: Extended ANSI/SPARC architecture.


This architecture does not consider distributed, so it need to be consequently adapted to provide distribution trasnparency. To this end, a global conceptual schema is needed to define a single logical database. But the database is composed of several nodes, each of which must now define a local conceptual schema and an internal schema. These adaptations are depicted in Figure 4.


PIC

Figure 4: Extended ANSI/SPARC architecture with distribution.


In both architectures, mappings between each layer are stored in the global catalog, but in the distributed architecture there two mappings which are particularly important, namely the fragmentation schema and the allocation schema.

3.6 Draw a classical reference functional architecture for DDBMS

The functional architecture of a centralized DBMS is depicted in Figure 5. The query manager is a component of a database management system (DBMS) that is responsible for handling user queries and managing the overall query processing. It is composed of several sub-components:

Once these steps are done, the execution manager launches the different operators in the access plan in order, building up the results appropriately.

The scheduler deals with the problem of keeping the databases in a consistent state, even when concurrent accesses occur, preserving isolation (I from ACID).

The recovery manager is responsible for preserving the consistency (C), atomicity (A) and durability (D) properties.

The buffer manager is responsible for bringing data to main memory from disk, and vice-versa, communicating with the operating system.


PIC

Figure 5: Functional architecture of a centralized DBMS.


This architecture is not sufficient to deal with distributed data. The functional architecture of a distributed DBMS (DDBMS) is depicted in Figure 6. As we can see, there are now two stages:

  1. Modules cooperate at the global level, transforming the data flow and mapping it to the lower layers, dealing with a single view of the database and the distribution transparency:

    1. The global query manager contains the view manager, security manager, constraint checker and query ooptimizer, which behave as in the centralized case, except for the optimizer, which now considers data location and consults the global schema to determine which node does what.

    2. The global execution manager inserts communication primitives in the execution plan and coordinates the execution of the pieces of the query in the different components to build up the final results from all the query pieces executed distributedly.

    3. The global scheduler receives the global execution plan and distributes trasks between the available sites, guaranteeing isolation between different users.

  2. Modules cooperate at the local level, with a very similar behavior to that of the centralized DBMS.


PIC
Figure 6: Functional architecture of a DDBMS.


3.7 Enumerate the eight main features of Cloud Databases

3.8 Explain the difficulties of Cloud Database providers to have multiple tenants

The difficulty can be summarized as the need to deal with the potential high number of tenants, and the unpredictability of their workloads’ characteristics. Popularity of tenants can change very rapidly, fact that impacts the Cloud services hosting their products. Also, the activities that they perform can change.

Thus, the provider has to implement mechanisms to be able to deal with this variety and variability in the workloads.

Also, the system should tolerate failures and offer self-healing mechanisms, if possible.

Finally, the software should easily allow to scale out to guarantee the required latencies. Adding or upgrading new machines should happen progressively, so that service suspension is not necessary at all.

3.9 Enumerate the four main problems tenants/users need to tackle in Cloud Databases

3.10 Distinguish the cost of sequential and random access

3.11 Explain the difference between the cost of sequential and random access

3.12 Distinguish vertical and horizontal fragmentation

Data fragmentation deals with the problem of breaking datasets into smaller pieces, decreasing the working unit in the distributed system. It has been useful to reflect the fact that applications and users might be interested in accessing different subsets of the data. Different subsets are naturally needed at different nodes and it makes sense to allocate fragments where they are more likely to be needed for use. This is data locality.

There are two main fragmentation approaches:

3.13 Recognize the complexity and benefits of data allocation

Once the data is fragmented, we must decide where to place each segment, trying to optimize some criteria:

This problem is NP-hard and the optimal solution depends on many factors.

In a dynamic environment the workload and access patterns may change and all these statistics should always be available in order to find the optimal solution. Thus, the problem is simplified with certain assumptions and simplified cost models are built so that any optimization algorithm can be adopted to approximate the optimal solution.

There are several benefits to data allocation, including:

3.14 Explain the benefits of replication

Data replication refers to the process of making and maintaining multiple copies of data across multiple nodes in a distributed database system. There are several benefits to data replication, including:

3.15 Discuss the alternatives of a distributed catalog

The same design problems and criteria can be applied to the catalog, but now we are storing metadata. This requires two important considerations:

  1. Metadata is much smalles than data, which makes it easier to manage.

  2. Optimizing performance is much more critical, since accessing this metadata is a requirement for any operation in the system.

Many decisions are already made by the architects of the system, and only few options can be parameterized on instantiaitng it.

A typical choice we can make in many NOSQL systems is having a secondary copy of the coordinator (mirroring) that takes control in case of failure. Of course, this redundancy consume some resources.

3.16 Decide when a fragmentation strategy is correct

4 Distributed Data Processing

4.1 Explain the CAP theorem

The CAP theorem, also known as Brewer’s theorem2 , is a principle that states that in a distributed system, it is impossible to simultaneously provide all three of the following guarantees:

According to the CAP theorem, a distributed system can only provide two out of these three guarantees at a time. In other words, a distributed system can either prioritize consistency and partition tolerance, consistency and availability, or availability and partition tolerance, but it cannot achieve all three simultaneously.

This theorem has important implications for the design and operation of distributed systems, as designers must carefully consider which trade-offs to make when choosing between consistency, availability, and partition tolerance. In larger distributed-scale systems, network partitions are given for granted. Thus, we must choose between consistency and availability: Either we have an always-consistent system that becomes temporally unavailable, or an always-available system that temporally shows some inconsistencies.

4.2 Identify the 3 configuration alternatives given by the CAP theorem

4.3 Explain the 4 synchronization protocols we can have

There are two choices that generate four alternative configurations for replica synchronization management:

These two choices give rise to four possible alternatives, depicted in Figure 7.


PIC

Figure 7: Replica synchronization alternatives. Source: [2].


  1. A user can only modify the primary copy, and his changes are immediately propagated to any other existing copy (which can always be read by any user). Only after being properly propagated and changes acknowledged by all servers, the user receives confirmation.

  2. A user can only modify the primary copy, and receives confirmation of this change immediately. His changes are eventually propagated to any other existing copy (which can always be read by any user).

  3. A user can modify any replica, and her changes are immediately propagated to any other existing copy (which can always be read by any user). Only after being properly propagated and changes acknowledged by all servers, the user receives confirmation.

  4. A user can modify any replica, and receives confirmation of this change immediately. His changes are eventually propagated to any other existing copy (which can always be read by any user).

a) and c) correspond to the traditional concept of consistency, while b) and d) correspond to the concept of eventual consistency.

4.4 Explain what eventual consistency means

Eventual consistency is a concept in distributed databases that refers to a property of the system where all updates to a data item will eventually propagate to all nodes in the system and converge to a consistent state, given a sufficiently long period of time without updates.

In a distributed system, data is replicated across multiple nodes, and each node maintains a copy of the data. Due to network latency, nodes may have different versions of the data at any given time, leading to inconsistencies between replicas. Eventual consistency allows for these inconsistencies to exist temporarily until all nodes have received the updated data.

Eventual consistency does not guarantee immediate consistency between replicas, but it does ensure that all replicas will eventually converge to a consistent state. This property is particularly useful for distributed systems that prioritize availability and partition tolerance over consistency 3 , such as in large-scale web applications or data-intensive systems.

4.4.1 Replication management configurations

Let:

The inconsistency window is the time during which W < N.

Some usual configurations are:

4.5 Enumerate the phases of distributed query processing

  1. The global query optimizer performs:

    1. Semantic optimization

    2. Syntactic optimization:

      1. Generation of syntactic trees

      2. Data localization

      3. Reduction

    3. Global physical optimization

  2. Then, the local query optimizer performs local physical optimization.

4.6 Explain the difference between data shipping and query shipping

Data shipping and query shipping are both techniques used in distributed databases to improve performance and reduce network traffic. However, they differ in the way they handle data movement.

Data shipping involves moving the data itself from one node to another node in the network to execute the query. In other words, the data is shipped to the node where the query is executed. This approach works well when the amount of data being moved is small, and the network has low latency and high bandwidth.

On the other hand, query shipping involves shipping the query to the nodes where the data resides and executing the query on those nodes. In this approach, the network traffic is reduced because only the query is sent over the network, and the data remains in its original location. This approach works well when the data is large, and the network has high latency and low bandwidth.

It is possible to design hybrid strategies, in which it is dynamically decided what kind of shipping to perform.

4.7 Explain the meaning of ’reconstruction’ and ’reduction’ in syntactic optimization

Reconstruction refers to how the datasets are obtained from their fragments. For example, a dataset which is horizontally fragmented is reconstructed by means of unions.

On the other hand, reduction refers to the process of removing redundant or unnecessary operations from a query without changing its semantics. This is achieved by applying various optimization techniques, such as elimination of common sub-expressions, dead-code elimination, and constant folding, which can simplify the query execution plan and reduce the number of operations required to produce the result.

4.8 Explain the purpose of the ’exchange’ operator in physical optimization

The exchange operator is used to redistribute data between nodes when it is needed to complete a query. For example, when a query involves joining two tables that are partitioned across multiple nodes, the exchange operator is used to redistribute the data so that the join can be performed locally on each node, instead of sending all the data to a single node for processing.

The exchange operator can be used for both horizontal and vertical partitioning. In horizontal partitioning, the exchange operator is used to redistribute the rows of a table between nodes. In vertical partitioning, the exchange operator is used to redistribute the columns of a table between nodes.

4.9 Enumerate the 4 different cost factors in distributed query processing

The cost is the sum of the local cost and the communication cost:

4.10 Distinguish between response time and query time

Query time (or execution time) is the time that it takes for the system to process a query, since it starts it execution until the results start being returned to the user (or are completely returned, if desirable).

Response time is a wider term, that refers to the time it takes for the system since the user issues a query until she receives the response.

4.11 Explain the different kinds of parallelism

4.12 Identify the impact of fragmentation in intra-operator parallelism

Intra-operator parallelism is based on fragmenting data, so that the same operator can be executed parallely by issuing it to different fragments of the data.

If there is a preexistant (a priori) fragmentation, it can be used for this. But even if the dataset has not been previously fragmented, the DDBMS can fragment it on the fly to benefit from this approach.

The input of an operation can be dynamically fragmented and parallelized, with different strategies:

If dynamic fragmentation is used, a new property containing information about the fragmentation strategy being used, the fragmentation predicates and the number of fragments produced must be added to the process tree.

4.13 Explain the impact of tree topologies (i.e. linear and bushy) in inter-operator parallelism

A linear query plan, also known as a pipeline, consists of a series of operators that are executed in a linear sequence. In this topology, inter-operator parallelism is limited because the output of one operator must be fully consumed by the next operator before it can begin processing its input. This means that the degree of parallelism is limited by the slowest operator in the pipeline.

On the other hand, a bushy query plan consists of multiple subtrees that can be executed in parallel. In this topology, operators are arranged in a more complex structure that allows for more inter-operator parallelism. For example, two independent subtrees can be executed in parallel, with the results of each subtree combined in a later operation.

In general, a bushy query plan is more amenable to inter-operator parallelism than a linear query plan. However, the degree of parallelism that can be achieved depends on many factors, including the number of available processing resources, the characteristics of the data being processed, and the specifics of the query being executed.

Linear trees can be exploited with parallelism by pipelining, which consists in the creation of a chain of nested iterators, having one of them per operator in the process tree. The system pulls from the root iterator, which transitively propagates the call through all other iterators in the pipeline. This does not allow parallelism per se, but it can if we add a buffer to each iterator, so they can generate next rows without waiting for a parent call. Thus, the producer leaves its result in an intermediate bugger and the consumer takes its content asynchronously. This buffers imply that stalls can happen when an operator becomes ready and no new input is available in its input buffer, propagating the stall to the rest of the chain.

4.14 Explain the limits of scalability

Amdahl’s law states that

S(p,N ) =-----1----,
         (1- p)+  pN-

where S is the maximum improvement reachable by parallelizing the system, N is the number of subsystems, and p is the fraction of parallelizable work of the system.

This is generalized by the universal scalability law, which states that

C (σ,κ,N ) =------------N-------------,
           1 + σ⋅(N - 1)+ κ ⋅N (N  - 1)

where C is the maximum improvement reachable by parallelizing the system, N is the number of subsystems, σ is the system’s contention or the non-parallelizable fraction work of the system, and κ is the system’s consistency delay, which models how much the different parallel units require communication.

Thus, scalability is limited by:

4.15 Given the overall number of machines in the cluster, identify the consistency problems that arise depending on the configuration of the number of required replicas read and written to confirm the corresponding operations

4.16 Given a parallel system and a workload, find the number of machines maximizing throughput

4.17 Estimate the cost of a distributed query

4.18 Given a query and a database design, recognize the difficulties and opportunities behind distributed query processing

5 Hadoop Distributed File System (HDFS)

5.1 Recognize the need of persistent storage

Persistent storage is important because it allows data to be stored and accessed even after a system or application has been shut down or restarted. Without persistent storage, data would be lost each time the system or application is shut down, which is clearly not desirable in most cases.

5.2 Enumerate the design goals of GFS

5.3 Explain the structural components of HDFS

The architecture is Coordinator-Worker:

They have the following characteristics:

A depiction of the architecture is in Figure 8.


PIC

Figure 8: HDFS Architecture. Source: HDFS-Architecture.


5.4 Name three file formats in HDFS and explain their differences







Format

Description

Pros

Cons

Use Cases











SequenceFile

Simple binary file format consisting of key-value pairs.

(Horizontal layout)

- Compact and efficient for large files

- Good for streaming

- Not as flexible as other formats

- Lacks some compression options

- Log files

- Sensor data

- Web server logs






Avro

Data serialization system with a compact binary format and support for schema evolution

(Horizontal layout)

- Supports rich data types and schema evolution

- Can be used with many programming languages

- May not be as efficient as some other formats

- Requires a schema

- Data exchange between Hadoop and other systems

- Machine learning applications






Zebra

Table-based format for structured data with support for indexing and filtering

(Vertical layout)

- Provides indexing and filtering capabilities

- Efficient for join operations

- Good for projection-based workloads

- Limited support for compression

- May not be as flexible as other formats

- Data warehousing

- OLAP






ORC

Optimized Row Columnar format for storing Hive tables with support for compression and predicate pushdown

(Hybrid layout)

- Efficient for analytical workloads

- Supports predicate pushdown for filtering

- Requires schema

- Not as widely supported as some other formats

- Data warehousing

- Hive tables






Parquet

Columnar file format with support for nested data and compression, optimized for query performance

(Hybrid layout)

- Supports nested data types and compression

- Efficient for analytical queries

- Supports predicate pushdown for filtering

- Not as efficient for write-heavy workloads

- May require more memory

- Analytics

- Data warehousing

- Machine learning







Table 1: Pros and cons of each data format of HDFS.








Feature
Horizontal
Vertical
Hybrid















SequenceFile Avro Zebra ORC Parquet






Schema No Yes Yes Yes Yes






Column Pruning No No Yes Yes Yes






Predicate Pushdown No No No Yes Yes






Metadata No No No Yes Yes






Nested Recrods No No Yes Yes Yes






Compression Yes Yes Yes Yes Yes






Encoding No Yes No Yes Yes






Table 2: Comparison of data formats of HDFS.

Column pruning involves eliminating unnecessary columns from the result set of a query before executing the query. This is done by analyzing the query and determining which columns are required to satisfy the query. The optimizer then generates a plan that only includes the necessary columns, reducing the amount of I/O and CPU processing required to execute the query. Column pruning is particularly useful in queries that involve large tables with many columns, as it can significantly reduce the amount of data that needs to be scanned.

Predicate pushdown involves pushing down filtering conditions into the storage layer, rather than applying the filters after reading the data into memory. This is done by analyzing the query and determining which predicates can be evaluated at the storage layer before the data is read into memory. The storage layer then applies the predicates before returning the data to the query engine. This can significantly reduce the amount of data that needs to be read into memory, reducing the I/O and CPU processing required to execute the query. Predicate pushdown is particularly useful in queries that involve large tables with many rows, as it can significantly reduce the amount of data that needs to be read from disk.

5.5 Recognize the importance of choosing the file format depending on the workload

As we have seen, each format provides a different set of features, which will affect the overall performance when retrieving the data from disk. There are heuristic rules to decide the most suitable file format depending on the kind of query to be executed:

5.6 Explain the actions of the coordinator node in front of chunkserver failure

The coordinator is in charge of the detection of failures and tolerance. Periodically, the namenode receives heartbeat messages from the datanodes. If a namenode systematically fails to send heartbeats, then the namenode assumes that the datanote is unavailable and corrective actions must be taken. The namenode:

  1. Looks up the file namespace to find out what replicas were maintained in the lost chunkserver.

  2. This missing replicas are fetched from the other datanodes maintaining them.

  3. They are copied to a new datanode to get the system back to a robust state.

5.7 Explain a mechanism to avoid overloading the master node in HDFS

The strategy is based on caching metadata in the client, and works as follows:

  1. The first time a file is requested, the client applications must request the information from the namenode.

  2. The namenode instructs the corresponding datanodes to send the appropriate chunks. They are chosen according to the closeness in the network to the client, optimizing bandwidth.

  3. The datanodes send the chunks composing the file to the client application.

  4. The client is now able to read the file. But it also keeps the locations of all the chunks in a cache.

  5. If the client needs the same file, it does not need to ask the namenode, but can request directly to the datanodes whose information was cached before.

The set of caches in the clients can be seen a strategy for fragmentation and replication of the catalog.

5.8 Explain how data is partitioned and replicated in HDFS

5.9 Transaction management

HDFS applies a eager/primary-copy strategy for replica synchronization:

5.10 Recognize the relevance of sequential read

Sequential reads heavily benefit from data locality, which is mostly ignored by random access.

If we use a rotating disk, then the cost basically depdens on three components:

The different between sequential and random acces is that random access does not find the data together, so its cost is:

CRA = n ⋅(seek +rotation + transfer),

while for sequential access, seek and rotation only accounts once:

C   = seek+ rotation+ n ⋅transf er.
 SA

Moreover, sequential acces.s pattern makes the next read absolutely predictable with pre-fetching, maximizing the effective read ratio by benefitting from the multiples layers of caching. In figure 9, there is a diagram of how memory caching works. The closer to the disk, the more capacity the memory has, but with higher latency. The closer to the CPU, the faster and smaller the memory is. Thus, finding data in the top levels of cache is hard, but crucial to gain performance.


PIC

Figure 9: Memory caching diagram. Source: Hazelcast Glossary of Terms.


5.11 Choose the format for an HDFS file based on heuristics

5.12 Estimate the data retrieved by scan, projection and selection operations in SequenceFile, Avro and Parquet

6 HBase

6.1 Give the definition of the BigTable data model

A BigTable is a sparse, distributed, persistent, multi-dimensional, sorted map:

6.2 Explain what a map structure is

A map data structure is a collection of key-value pairs that allows fast and flexible access to its elements based on keys. Maps can store different types of keys and values, such as numbers, strings, objects, etc. Maps are useful for storing associations between two objects or values.

6.3 Explain the difference between a Key-Value and a Wide-Column store

A key-value store is a simple model that stores data as pairs of keys and values. A wide-column store is a more complex model that stores data as rows and columns, where each row can have different columns. Some differences between them are:

Some examples of key-value stores are Redis, etcd and Memcached. Some examples of wide-column stores are Cassandra, HBase and ScyllaDB.

6.4 Enumerate the main schema elements of HBase

Example 6.1. An example




Users



Row key ColumnFamily1 ColumnFamily2






Personal Address



Alice name: Alice Smith, email: alice@upc.es street: Avinguda Diagonal, city: Barcelona



Bob name: Bob Jones street: Gran Vía, city: Murcia



Charlie name: Charlie Brown, email: charlie@ch.ch



If we wanted to recreate this example in HBase:

1--0 Open the HBase shell 
2--1 Create the table Users with column families Personal and Address 
3CREATE TABLE Users, Personal, Address 
4 
5--2 Insert records 
6PUT Users, Alice, Personal:name, Alice Smith, Personal:email, alice@upc.es, Address:street, Avinguda Diagonal 
7 
8--3 We can add more values (we forgot the city!) 
9PUT Users, Alice, Address:city, Barcelona 
10 
11--4 Same for the rest 
12 
13--5 To read a value 
14GET Users, Alice

6.5 Explain the main operations available of HBase

6.6 Enumerate the main functional components of HBase

6.7 Explain the role of the different functional components in HBase

6.8 Explain the tree structure of data in HBase

In HBase, data is stored in a tree structure that is composed of regions, stores, and memstores.

Within each store, data is stored in a column-oriented fashion, with all values for a given column stored together. This allows for efficient access to columns and column families, and also enables compression and other optimization techniques.

Thus, a StoreFile is a file of HFile format, consisting on several HDFS chunks of size 128 MB, which are structured into HBase blocks of size 64 KB.


StoreFile (HFile format)












128 MB
64 KB 64 KB 64 KB






64 KB 64 KB 64 KB






64 KB 64 KB ...








128 MB
64 KB 64 KB 64 KB






64 KB 64 KB 64 KB






64 KB 64 KB ...





Table 3: A StoreFile.

6.9 Explain the 3 basic algorithms of HBase

HBase employs three basic algorithms to manage the storage and retrieval of data: flush, minor compaction, and major compaction.

Both minor and major compactions are configurable, and the frequency and timing of these operations can be adjusted based on the workload and resource availability of the HBase cluster.

6.10 Explain the main components and behavior of an LSM-tree

The LSM-tree (Log-Structured Merge Tree) is a data structure that is used in many modern distributed databases, including HBase, Cassandra, and LevelDB. The LSM-tree is designed to provide efficient and scalable read and write performance for large datasets. The main components and behavior of an LSM-tree are:

As can has been outlined, there are two main maintenance operations:

6.11 Compare a distributed tree against a hash structure of data

A distributed tree and a hash structure are two different approaches to organizing data in a distributed system, and each has its own strengths and weaknesses.

A distributed tree, such as HBase’s region-based storage model, uses a hierarchical structure to organize data. The tree is partitioned into regions, with each region containing a range of contiguous row keys. Each region is stored on a separate server, allowing the system to scale horizontally by adding more servers as needed. This approach is well-suited for read-heavy workloads where data is frequently accessed based on its key, as it allows for efficient range scans and lookups of individual keys. However, it can be less efficient for write-heavy workloads, as writes may require updating multiple nodes in the tree.

A hash structure, on the other hand, uses a non-hierarchical approach to organizing data, with each item in the structure being assigned a unique key based on a hash function. This allows for efficient storage and retrieval of data based on its key, as the hash function can be used to quickly locate the relevant data without requiring a hierarchical lookup. Hash structures are well-suited for write-heavy workloads, as they can be designed to minimize the number of nodes that need to be updated for each write. However, they may be less efficient for range scans or other types of queries that require traversing large amounts of data.

In general, the choice between a distributed tree and a hash structure will depend on the specific requirements of the application and the workload it needs to support. Both approaches have their strengths and weaknesses, and the best choice will depend on factors such as the size and structure of the data, the expected read and write patterns, and the performance and scalability requirements of the system.

6.12 Justify the need of dynamic hashing

Dynamic hashing is a technique used in database management systems to handle collisions that can occur when multiple keys are hashed to the same index in a hash table. This technique involves adjusting the size of the hash table dynamically as the number of keys increases, to maintain a low collision rate and ensure efficient access to the data.

There are several reasons why dynamic hashing is necessary:

Without dynamic hashing, the functioning is to have a hash function f(x) and an assignation function h(x). The hash function distributes the values into a determined range, ideally uniformly, while the assignation function decides in which region the record should be stored. A typically used assignation function is

h (x ) = f (x) mod #servers

Note, nonetheless, that in this case, adding a new server implies modifying the assignation function, which implies communicating the new function to all servers, as well as a massive data transfer. Thus, the importance of dynamic hashing.

Another challenge, is that any access must go through the hash directory.

Example 6.2. Let’s see what could happen with an example. Imagine we have three nodes, and they are working normal, until this point is reached:




N0 N1 N2






0 1 2



3 4 5



6 7 8



9 10 11



If now we want to add a new machine, because we need more resources, the situation after the restructuration would be as follows:





N0 N1 N2 N3








0 1 2 3




4 5 6 7




8 9 10 11




12




Here, all red-colored records have been moved. We see how this situation entails high transfer costs.

6.12.1 Linear hash

Linear Hashing is a dynamic hashing technique that enables a hash table to grow or shrink dynamically as the number of keys increases or decreases. The hash table is divided into a series of buckets, each of which can hold one or more keys. When the number of keys in a bucket exceeds a certain threshold, the bucket is split, and a new bucket is created to hold the overflow keys.

We maintain a pointer to the next bucket to split, and two hash functions are considered. We take n such thath 2n #servers < 2n+1 and use the functions h1(x) = x mod 2n and h2(x ) = x mod 2n+1. When a bucket overflows, the pointed bucket splits.

6.12.2 Consistent hash

Consistent Hashing is a technique used to distribute data across multiple nodes in a cluster. It involves mapping each node and data item to a point on a circle, and using the position of the item on the circle to determine which node it should be stored on. When a node is added or removed from the cluster, the items that were previously assigned to that node need to be redistributed across the remaining nodes. In this case, we choose the hash function to lie on a range that is large enough to cope with all our possible values. The circle arrangement means that, to determine the node in which to store an object, we do the following:

6.13 Explain the structure of the HBase catalog

The HBase catalog is a set of internal tables that HBase uses to store metadata about the tables and regions in the cluster. The catalog is managed by the HMaster daemon and provides information about the location of regions, which servers are serving them, and which versions of the data are available.

The catalog consists of the following tables:

All of these tables are HBase tables themselves, and are stored in the same way as other tables in the cluster, with regions split across the available region servers. The ROOT and META tables are special in that they are stored in memory on the first region server in the cluster, and are not split into regions like other tables.

The catalog is an essential component of the HBase architecture, as it provides the means for clients to locate the regions that contain the data they need, and for HBase to manage the distribution and replication of data across the cluster.

Thus, the main structure is a three-level structure, consisting on the ROOT table, the META table and the data itself.

6.14 Explain the mistake compensation mechanism of the cache in HBase client

HBase implements a client cache similar to that of HDFS to avoid constantly disturbing the coordinator. Thus, only the first time a key is requested by an application, the request needs to go down the tree structure of metadata. Eventually, some RegionServer will send the corresponding data to the client, and this will take note in the cache of who did this. In successive requests, that key will be found in the client cache and the request will be directly addressed to the right RegionServer.

The tree structure of HBase is more volatile than that of HDFS directory, as well as multilevel; two facts that complicate its management. It can happen that when the application finds the key in the cache and requests it to the RegionServer, this one does not have that key anymore. In this case, the RegionServer itself scales the request up the tree structure to its parent.

In the worst case, they key would not be under the parent either, and this will propagate the request to the RootRegion. Since the RootRegion has the information of the whole domain of keys, it is guaranteed that it will be able to forward the request down the tree to the appropriate MetaRegion, and this to the user RegionServer that now has the key, which will directly send the corresponding value to the client. Since we are assuming a three-level tree, this compensation actions will require at most four extra calls between the different RegionServers.

6.15 Enumerate the ACID guarantees provided by HBase

6.16 Explain the execution flow of an HBase query both at global and local levels

The execution flow of an HBase query can be divided into two levels: global and local.

Global level:

  1. Client sends a query to the HBase RegionServer that owns the row key or range of row keys being queried.

  2. When a query is received by a HBase RegionServer, the Yet Another Resource Negotiator (YARN) resource manager is consulted to determine if there are available resources to handle the query.

  3. The RegionServer consults the HBase catalog to locate the region(s) that contain the data being queried. This allows for inter-query parallelism, as different queries can go to different regions. Moreover, if read replicas are enabled, read-only queries can be further parallelized.

  4. The RegionServer forwards the query to each of the relevant region servers that are serving the region(s) containing the data. Here, intra-query parallelism is possible if the domain of the keys is appropriately set. Nonetheless, in general this is not possible.

  5. Each RegionServer performs the query locally and sends the results back to the RegionServer that received the query.

  6. The RegionServer aggregates the results from the different regions and sends the final result back to the client.

Local level:

  1. When a query is received by a region server, it consults its own in-memory cache to check if the required data is already present in memory.

  2. If the required data is not present in memory, the region server reads the required data from disk and caches it in memory for future use.

  3. The region server performs the required operations on the cached data and returns the result to the RegionServer that received the query.

  4. If required, the region server may write any changes back to disk after the query is complete.

6.17 Given few queries, define the best logical structure of a table considering its physical implications in terms of performance

6.18 Given the data in two leafs of a Log-Structured Merge-tree (LSM-tree), merge them

6.19 Given the current structure of a Linear Hash, modify it according to insertions potentially adding buckets

To modify the Linear Hash structure to handle insertions potentially adding buckets, first we need some definitions:

When we receive a new record, we follow the steps:

  1. We compute h1(x). If the correspondent bucket is already split, we take h2(x).

  2. If hi(x) is not full, we introduce record x in it.

  3. Else, we split the pointed bucket, and create a new overflow bucket (if it does not exist before) connected to the bucket that is full.

    1. We insert x in this overflow bucket.

    2. We update the values of the split bucket, taking h2 to its values and moving them as needed.

Eventually, we will have space to allocate the new records.

Example 6.3. Let’s see one example. We have the following situation:



Buckets




B0 2,4


B1 3,5


Here, colour blue indicates the pointed bucket. Now, we want to insert key 9. Then, the result would be:

Buckets












B0 4




B1 3,5 Overflow 9




B2 2




Say we now insert 11. Then:



Buckets




B0 4


B1 5,9


B2 2


B3 3


As we have splitted all the initial buckets, we increase n by 1 and reset the pointer to bucket 0.

6.20 Given the current structure of a Consistent Hash, modify it in case of adding a bucket

If we add a bucket, k, to the current structure of a Consistent Hash, it will be located between two buckets, j and j - 1. Thus:

Example 6.4. Imagine a range large enough is 41, and we take h(x) = x mod 41. We have the following structure:



BucketId Objects




8 2,5,7


16 8,13,14


32 18,19,21,25,27,29


40 34,37,38


We want to add a new node, with id 24:



BucketId Objects




8 2,5,7


16 8,13,14


24 18,19,21


32 25,27,29


40 34,37,38


6.21 Calculate the number of round trips needed in case of mistake compensation of the tree metadata

6.22 Use HBase shell to create a table and access it

6.23 Use HBase API to create a table and access it

7 Document Stores: MongoDB

7.1 Explain the main difference between key-value and document stores

Document stores are essentially key-value stores, but in which the value is a document, i.e., it follows a known syntax. This allows to define secondary indexes.

In a key-value store, data is stored as a collection of key-value pairs, where each key is unique and each value is associated with that key. These stores are optimized for high-speed access and retrieval of data based on a specific key. They are often used for simple data retrieval tasks, such as caching or storing user session data.

On the other hand, document stores are designed to handle more complex data structures, such as documents or JSON objects. In a document store, data is stored as collections of documents, which can contain nested data structures and arrays. These stores are optimized for more complex data retrieval and querying operations, making them a better choice for applications that require more advanced data processing capabilities, such as content management systems or e-commerce platforms.

7.2 Explain the main resemblances and differences between XML and JSON documents

XML and JSON are both widely used data interchange formats that are used to represent and transmit structured data between different systems. While both formats serve a similar purpose, they have some key differences and similarities.

Similarities:

Differences:

7.3 Explain the design principle of documents

The design principle of document stores is based on the idea of storing data as documents rather than in a relational table structure.

To be able to solve the impedance mismatch problem, documents break the 1NF. This avoids joins, so that we can get data needed with one single fetch, and use indexes to identify finer data granularity.

7.4 Name 3 consequences of the design principle of a document store

7.5 Explain the difference between relational foreign keys and document references

In a relational database, foreign keys are used to establish relationships between tables. A foreign key is a field in one table that refers to the primary key of another table. For example, if you have a table of customers and a table of orders, you might have a foreign key field in the orders table that refers to the primary key of the customers table. This allows you to associate orders with customers and perform queries that join the two tables based on the foreign key relationship.

In a document store, document references are used to establish relationships between documents. A document reference is a field in one document that refers to the ID of another document. For example, if you have a collection of blog posts and a collection of comments, you might have a document reference field in each comment document that refers to the ID of the blog post it relates to. This allows you to associate comments with blog posts and perform queries that fetch all comments for a given blog post.

The main difference between relational foreign keys and document references is that foreign keys are based on a strict, predefined schema, while document references allow for more flexible and dynamic data models. In a relational database, the schema is fixed and you must define foreign keys between tables before data can be inserted. In a document store, the schema is more flexible, and you can add document references to establish relationships between documents as needed.

Another difference is that foreign keys are typically used to enforce referential integrity between tables, which means that you can’t insert a row in the orders table unless the corresponding customer exists in the customers table. Document references, on the other hand, do not enforce referential integrity in the same way, and it’s possible to have document references to non-existent documents. However, some document stores do offer features to enforce referential integrity, such as cascading deletes or validation rules on document references.

7.6 Exemplify 6 alternatives in deciding the structure of a document

  1. Schema variability: potentially different schema is specific to every document in semi-structured. This entails:

    1. Metadata embedding: Suppose you are designing a document to store information about a book, such as the title, author, and publisher. In a semi-structured data model, you may choose to embed metadata within the document itself, rather than storing it separately in a schema. For example, you may include fields such as "creation date" or "last updated by" within the document.

    2. Attribute optionality: Continuing with the example of a book document, you may choose to make certain attributes optional, depending on the nature of the data. For example, you may not always have information about the edition or the ISBN number, so you may choose to make these fields optional in the document schema.

  2. Schema declaration: a priori schema declaration is just optional and flexible in semi-structured data. This includes the declaration of:

    1. Structure and data types: Suppose you are designing a document to store information about a customer, such as their name, address, and order history. In a semi-structured data model, you may choose to declare a basic schema for the document, but allow for flexibility in the specific fields and data types used. For example, you may declare that the document should have a "name" field of type string, but allow for variation in the format or content of the name (e.g. first name, last name, or both).

    2. Integrity constraints: Continuing with the example of a customer document, you may choose to impose certain integrity constraints on the data, such as ensuring that the customer’s email address is unique across all documents. However, you may also allow for flexibility in the data model by not enforcing constraints that are not critical to the business logic.

  3. Structure complexity: complex nesting can be used in semi-structured data. This includes the representation of:

    1. Nested structures: Suppose you are designing a document to store information about a company’s organizational structure, such as the departments, managers, and employees. In a semi-structured data model, you may choose to represent this structure using nested objects or arrays, to allow for flexibility in the depth and complexity of the organizational hierarchy.

    2. Multi-valued attributes: Continuing with the example of an organizational structure document, you may choose to use multi-valued attributes to represent relationships between entities. For example, you may include a field for "direct reports" in the manager object, which can contain an array of employee objects representing the manager’s subordinates.

7.7 Explain the difference between JSON and BJSON

JSON (JavaScript Object Notation) and BJSON (Binary JSON) are both data interchange formats that are used to represent semi-structured data in a human-readable format. However, the main difference between the two is that BJSON is a binary format, whereas JSON is a text-based format.

In more detail, the main differences between JSON and BJSON are as follows:

7.8 Name the main functional components of MongoDB architecture

We distinguish between machines that contain data, organized in replica sets and those that purely route queries, known as mongos.

  1. Replica sets:

    1. Config servers: contain the global catalog, which keeps track of existing shards.

    2. Shards: the components that actually store data.

    3. Balancer: a process inside the primary config server in charge of detecting unbalanced shards and moving chunks from one shard to another. This allows shards to split or migrate chunks of data between different machines.

  2. Mongos: they split the queries and merge back the results. To make it more efifcient and avoid disturbing the coordinator, they maintain a cache of shards, which is lazily synchronized. Typically, mongos sit in the client machine to avoid network traffic.

In addition, we have:

  1. Clients: MongoDB clients are applications or tools that interact with the MongoDB database. Clients can communicate with MongoDB using various drivers and APIs provided by MongoDB.

  2. Mongod: The mongod process is the main component of the MongoDB server. It manages the data stored in the database, handles read and write requests, and interacts with clients.

7.9 Explain the role of ’mongos’ in query processing

In MongoDB, ’mongos’ is a component of the architecture that acts as a query router in sharded clusters. Its main role is to route incoming client requests to the appropriate shard(s) in the cluster.

When a client sends a query request to the mongos process, mongos first checks whether the query includes the shard key. If the query includes the shard key, mongos routes the request directly to the appropriate shard(s) based on the shard key value. If the query does not include the shard key, mongos sends the query to all shards in the cluster and aggregates the results before returning them to the client.

Mongos also performs other important functions in query processing, including load balancing and query optimization. Specifically, mongos balances the load across different shards by distributing incoming queries evenly across all available shards. This helps to ensure that no single shard is overloaded with too many queries, which can lead to performance issues.

In addition, mongos performs query optimization by analyzing incoming queries and determining the most efficient way to process them. This involves selecting the appropriate indexes to use and optimizing the order of operations to minimize the number of documents that need to be scanned. The mechanism is rather simple and just pushes the first selection and projection to the shards.

This provides inter-query parallelism, since different routers and replicas can serve different users, but not inter-operator parallelism, since all are run in the router after the first operation in the shards is finished.

It also offers intra-operator parallelism in case of static fragmentation, as different shards would serve different pieces of the collection in parallel for the same query.

7.10 Explain what a replica set is in MongoDB

In MongoDB, a replica set is a group of MongoDB servers that maintain identical copies of the same data. A replica set provides high availability and automatic failover in case of server failures.

A replica set consists of several MongoDB instances, or nodes, that are configured to communicate with each other. One node is designated as the primary node, and the others are secondary nodes. The primary node is responsible for receiving write operations and applying them to the data set. The secondary nodes replicate the data from the primary node and can serve read operations.

If the primary node fails or becomes unavailable, one of the secondary nodes is automatically elected as the new primary node. This process is called failover, and it ensures that the replica set can continue to function even in the event of a node failure. Once the failed node is restored, it can rejoin the replica set and serve as a secondary node.

Replica sets provide several benefits in MongoDB, including high availability, fault tolerance, and scalability. They are often used in production environments to ensure that the database can continue to operate even in the face of hardware or network failures.

7.11 Name the three storage engines in MongoDB

MongoDB provides three storage engines to manage data:

Each storage engine has its own advantages and disadvantages, and the choice of storage engine depends on the specific use case and workload. For most workloads, the WiredTiger storage engine is recommended as it provides a good balance of performance, scalability, and reliability.

7.12 Explain what shard and chunk are in MongoDB

In MongoDB, sharding is a method for distributing data across multiple servers, or shards, to improve performance and scalability. A shard is a single MongoDB instance that stores a portion of the data.

A chunk is a contiguous range of data within a shard that is assigned to a specific shard key range. The shard key is a unique identifier used to distribute data across shards in a sharded cluster. The data in a MongoDB collection is partitioned into chunks based on the shard key value. Each chunk represents a range of shard key values that is managed by a specific shard. The chunk size is dynamically managed by the MongoDB balancer, which redistributes chunks across shards as the distribution of data changes over time. When the size of a chunk grows beyond a certain limit, the balancer splits the chunk into two smaller chunks, which are then assigned to different shards. Similarly, when the size of a chunk shrinks below a certain limit, the balancer merges the chunk with an adjacent chunk and assigns the new larger chunk to a single shard.

The use of shards and chunks allows MongoDB to horizontally scale databases to handle large volumes of data and high write and read request rates. By distributing data across multiple shards, MongoDB can provide better performance, availability, and scalability compared to a single monolithic database.

7.13 Explain the two horizontal fragmentation mechanisms in MongoDB

In MongoDB, horizontal fragmentation, or sharding, is a mechanism for dividing data across multiple servers, or shards, to improve performance and scalability. MongoDB provides two mechanisms for horizontal fragmentation:

  1. Range-based sharding: This mechanism partitions data based on a specified shard key range. Each shard is responsible for a specific range of shard key values. For example, if the shard key is a timestamp, a range-based sharding strategy can be used to split data by time intervals such as hours, days, or months.

  2. Hash-based sharding: This mechanism partitions data based on a hash of the shard key value. Each shard is responsible for a specific range of hash values. Hash-based sharding can be useful when the shard key values are not evenly distributed, as it ensures a more balanced distribution of data across shards.

Both range-based and hash-based sharding mechanisms have their own advantages and disadvantages. Range-based sharding is useful when the data is naturally partitioned into ranges, such as by time, and it can be easier to manage and monitor. Hash-based sharding can provide a more balanced distribution of data across shards and can be more flexible in handling changes in the data distribution over time.

In both mechanisms, MongoDB uses a shard key to determine how to distribute the data across shards. The shard key is a field in the data that is used to partition the data into chunks, which are then distributed across the shards. The choice of shard key is an important factor in determining the performance and scalability of a sharded MongoDB cluster. It is a mandatory attribute in all the documents of the collection, and must be indexed. It can be chosen by calling

sh.shardCollection (⟨namespace ⟩,⟨key⟩).

Note: there is no vertical fragmentation.

7.14 Explain how the catalog works in MongoDB

In MongoDB, the catalog is a metadata repository that stores information about the databases, collections, indexes, and other objects in the database. The catalog is used by MongoDB to manage and optimize the performance of database operations.

The catalog information in MongoDB is treated like any other piece of data. The data is stored in a replica set, and consequently enjoys all its synchronization benefits and consequences. The only specificity is that its information is cached in the routers. The behavior in MongoDB is also Lazy/Primary-copy.

The catalog is updated automatically by MongoDB as new databases, collections, and indexes are created, modified, or deleted. MongoDB also uses the catalog to optimize database operations, such as query planning and execution, by analyzing the metadata stored in the catalog.

Developers and database administrators can also query the catalog to retrieve information about the databases, collections, indexes, and other objects in the MongoDB instance. This information can be used to monitor and troubleshoot the performance of database operations and to optimize the schema and indexing strategies for the MongoDB collections.

7.15 Identify the characteristics of the replica synchronization management in MongoDB

Replication is based on Replica Sets, which are sets of mongod instances (typically three) that act coordinately. A shard siting in a replica set means that its data is mirrored in all the nodes that belong to that replica set. Since replica sets are disjoint, this means that scaling by sharding results very expensive in terms of the number of machines. It may be better to simply add more memory to a single machine.

The replica synchronization management in MongoDB has the following characteristics:

7.16 Explain how primary copy failure is managed in MongoDB

In MongoDB, a primary copy failure is managed through a process called automatic failover. When the primary node fails, the other nodes in the replica set elect a new primary node. The election process is based on a consensus protocol that ensures that the new primary node is chosen by a majority of the nodes in the replica set.

The election process works as follows:

  1. The nodes in the replica set communicate with each other to determine the status of the primary node. If the primary node fails to respond, the other nodes detect the failure and initiate an election.

  2. Each node that is eligible to become the primary node (i.e., a node that has a copy of the data and is up-to-date with the oplog) casts a vote for itself.

  3. The nodes communicate with each other to determine the node with the most votes. If a node receives a majority of the votes (i.e., more than half of the nodes in the replica set), it becomes the new primary node.

  4. If no node receives a majority of the votes, the election fails and the replica set cannot elect a new primary node. In this case, the administrators must intervene to resolve the issue.

Once a new primary node is elected, the other nodes in the replica set update their configurations to recognize the new primary node. The new primary node then starts accepting write operations and propagating data changes to the other nodes in the replica set.

In addition to automatic failover, MongoDB provides several features to minimize the risk of primary copy failure. These include:

7.17 Name the three query mechanisms of MongoDB

The three query mechanisms of MongoDB are:

7.18 Explain the query optimization mechanism of MongoDB

MongoDB’s query optimization mechanism is designed to minimize the time and resources required to execute queries. The query optimizer takes into account various factors when choosing an execution plan, such as the available indexes, the size of the collection, and the complexity of the query.

When a query is submitted to MongoDB, the query optimizer analyzes the query and selects an execution plan that is optimal for the query. The execution plan consists of a sequence of operations that MongoDB performs to retrieve the requested data. These operations can include scanning indexes, filtering data, and sorting data.

The query optimizer chooses the execution plan that minimizes the number of operations required to execute the query and the amount of data that needs to be scanned4 . This results in faster query execution and reduces the load on the system.

MongoDB’s query optimizer uses a variety of techniques to optimize queries, including:

7.19 Given two alternative structures of a document, explain the performance impact of the choice in a given setting

7.20 Simulate splitting and migration of chunks in MongoDB

7.21 Configure the number of replicas needed for confirmation on both reading and writing in a given scenario

7.22 Perform some queries on MongoDB through the shell and aggregation framework

7.23 Compare the access costs given different document designs

7.24 Compare the access costs with different indexing strategies (i.e. hash and range based)

7.25 Compare the access costs with different sharding distributions (i.e. balanced and unbalanced)

8 MapReduce I

8.1 Enumerate several use cases of MapReduce

MapReduce was based on Google development and was originally conceived to compute the page rank. Nonetheless, it can be used in many different situations:

8.2 Explain 6 benefits of using MapReduce

8.3 Describe what the MapReduce is in the context of a DDBMS

In the context of a DDBMS, this framework introduces some changes:

  1. Elimination of the globar scheduler: as data was only going to be read, no inconsistencies could arise, making the scheduler unnecessary.

  2. Elimination of the global query manager: it is the task of the developer to decide how a query should be executed.

  3. MapReduce substitutes the Global Execution Manager.

  4. Disregard the complexities of the DDBMS, by implementing MapReduce on top of Hadoop.

8.4 Recognize the signature of Map and Reduce functions

A Map is a function that takes a pair ⟨key,value⟩ of the input domain and obtain a set of zero or more new ⟨key,value⟩ pairs of the output domain:

#####                     TOK×TOV
#f :#TIK × TIV   -→     ′ 2′      ′  ′  ,
       ⟨k,v⟩    ↦→    {⟨k1,v1⟩,...,⟨kn,vn⟩}

here TIK is the domain of input keys, TIV the domain of input values, TOK the domain of output keys and TOV the domain of output values.

A Reduce is a function that takes all the pairs with the same value, and return a new set of ⟨key,value⟩ pairs combining them:

f :  TOK × 2TOV   -→        2TFK,TFV
    ⟨k,{v1,...,vk}⟩  ↦→   {⟨k′1,v′1⟩ ,...,⟨k′n,v′n⟩} ,

here TFK are the final keys and TFV the final values.

8.5 Explain the phases of a MapReduce operation

8.6 Justify to which extent MapReduce is generic

8.7 Simulate the execution of a simple MapReduce algorithm from the user (agnostic of implementation details) perspective

8.8 Identify the usefulness of MapReduce in a given use case

8.9 Define the key in the output of the map for a simple problem

8.10 Provide the pseudo-code of map and reduce functions for a simple problem

9 MapReduce II

9.1 Enumerate the different kind of processes in Hadoop MapReduce

In the Hadoop MapReduce framework, there are primarily three types of processes:

9.2 Draw the hierarchy of Hadoop MapReduce objects

9.3 Explain the information kept in the Hadoop MapReduce coordinator node

The Hadoop MapReduce coordinator node, also known as the JobTracker, keeps track of the following information:

  1. Map and Reduce tasks: It maintains the status (idle, in-progress, or completed) and the identity of each worker machine assigned to map or reduce tasks.

  2. Intermediate file regions: It records the location and size of each intermediate file region produced by each map task, storing O(M × R ) states in memory, as there are R files generated by each of the M mappers.

9.4 Explain how to decide the number of mappers and reducers

The number of map tasks depends on the splitting of the input, which is by default performed per HDFS block. A general rule of thumb is to have one map task per HDFS block. Ideally, each node should run between ten and a hundred mappers, each taking more than one minute to execute to justify its creation overhead.

For reducers, there are two suggested configurations:

9.5 Explain the fault tolerance mechanisms in Hadoop MapReduce

9.5.1 Worker Failure

The corresponding task is immediately reassigned, which is possible because all files are replicated in HDFS.

9.5.2 Master Failure

Master failure is less likely but can be mitigated by creating checkpoints of its in-memory structure that tracks the status of tasks and intermediate files. In case of failure, another node can be designated as the new master and continue the execution from the last checkpoint.

9.6 Identify query shipping and data shipping in MapReduce

Query shipping occurs in the map phase of MapReduce, where the map tasks are shipped to the nodes where the input data is stored.

Data shipping occurs during the shuffle phase of MapReduce, when intermediate data is moved from the mapper nodes to the corresponding reducer nodes.

9.7 Explain the effect of using the combine function in MapReduce

The combine function acts as a local reduce function for each mapper, helping to minimize the amount of key-value pairs stored on disk and transferred over the network. The combine function can be the same as the reduce function if it is commutative and associative. If not, it must be designed appropriately to ensure the final outcome remains unchanged while reducing intermediate computations.

9.8 Identify the synchronization barriers of MapReduce

  1. All input data must be uploaded to the HDFS before any processing can begin.

  2. All mappers must finish before reducers can start processing the data.

  3. When chaining multiple MapReduce jobs, the subsequent job cannot start until the previous one finishes writing its output.

9.9 Explain the main problems and limitations of Hadoop MapReduce

9.10 Apply the different steps of a MapReduce execution at the implementation level

9.11 Decide on the use of the combine function

10 Spark I

10.1 Name the main Spark contributions and characteristics

10.2 Compare MapReduce and Spark




MapReduce Spark






Data Storage Disk In-memory



Processing model two-stage multi-stage with DAGS



Ease of use Java Multiple languages



Libraries Relies on external libraries Built-in libraries for ML, graphs, streams



Iterative processing Costly Cheaper



10.3 Define a dataframe

Dataframes offer a symmetrical treatment of rows and columns, both of which can be referenced explicitly by position or by name. The data stored in a dataframe has to adhere to a schema, but this is defined at runtime, making it useful for data cleaning. Dataframes offer a great variety of operations, enabling us to perform relational-like operations, speadsheet operations and linear algebra operations. Also, their query syntax is incementally composable and dataframes can be natively embedded in an imperative language.

10.4 Distinguish dataframe from relation and matrix

|-------------------------------------------------|-------------------------------|----------------------------|
|-------------------Dataframe---------------------|            Matrix             |          Relation          |
|----Pandas--DF----------------Spark-DF-----------|-------------------------------|----------------------------|
|---------------Heterogeneously-typed---------------|------Homogeneously-typed-------|----Heterogeneously-typed-----|
|---------------Lazily- induced-schema----------------|-------------------------------|--------Rigid schema--------|
|-----------Numeric and-non-numeric types---------|---------Numeric-types---------|Numeric-and-non-numeric-types-|
|---------------Explicit-column-names----------------|-----------No-names------------|----Explicit column-names----|
|-------------Supports relational algebra------------|Does-not-support-relational algebra|---Supports-relational algebra--|
|------Ordered-------|----------Unordered----------|------------Ordered------------|---------Unordered----------|
|-----Named-rows-----|--------Unnamed-rows--------|-----------No-names------------|--------Unnamed-rows--------|
|Column--row-symmetry-|-Columns-and rows-are different------Column-row-symmetry-------|-Columns-and rows-are different|
-Support-linear algebra--Does not-support-linear algebra------Support linear-algebra-------Does not-support-linear algebra

10.5 Distinguish Spark and Pandas dataframe

10.6 Enumerate some abstraction on top of Spark

  1. Resilient Distributed Datasets (RDDs): RDDs are the fundamental abstraction in Spark, representing an immutable distributed collection of objects that can be processed in parallel. RDDs provide fault tolerance through lineage information and can be cached across multiple stages for iterative algorithms. They support low-level operations like map, filter, and reduce, allowing developers to have fine-grained control over data processing.

  2. DataFrames: DataFrames are a higher-level abstraction built on top of RDDs. They represent a distributed collection of data organized into named columns, similar to a relational database table. DataFrames provide a convenient API for handling structured and semi-structured data and allow for optimizations through the Catalyst query optimizer and the Tungsten execution engine. Operations like filtering, aggregation, and transformation are available through the DataFrame API.

  3. Spark SQL: Spark SQL is a module that provides a programming interface for working with structured and semi-structured data. It allows users to query data using SQL as well as the DataFrame API. Spark SQL integrates with the Spark ecosystem, enabling the use of SQL queries with other Spark libraries like MLlib and GraphX. It also supports reading from and writing to various data formats and storage systems, such as Parquet, Avro, JSON, Hive, HBase, and JDBC.

  4. MLlib: MLlib is Spark’s built-in library for scalable machine learning. It provides various machine learning algorithms for classification, regression, clustering, and recommendation, as well as tools for feature extraction, transformation, and model evaluation. MLlib is designed to scale out across a cluster, enabling the processing of large datasets for training and prediction tasks. It supports both RDD-based and DataFrame-based APIs.

  5. GraphX: GraphX is a library for graph processing and computation built on top of Spark. It allows users to work with graphs and perform graph-parallel computations at scale. GraphX provides a flexible graph computation API that enables users to express graph algorithms like PageRank, connected components, and triangle counting. It also includes a collection of graph algorithms and builders to simplify graph analytics tasks.

  6. Structured Streaming: Structured Streaming is a module for processing real-time data streams in a fault-tolerant and scalable manner. It provides a high-level API built on top of DataFrames, allowing users to express complex streaming computations using the same operations as batch processing. Structured Streaming handles the incremental processing of data streams, providing exactly-once processing guarantees and allowing for event-time and late-data processing. It supports various sources and sinks, such as Kafka, HDFS, and Delta Lake.

10.7 Provide the Spark pseudo-code for a simple problem using dataframes

11 Spark II

11.1 Define RDD

RDD stands for Resilient Distributed Dataset. It is the fundamental abstraction in Spark, representing an immutable distributed collection of objects that can be processed in parallel. RDDs provide fault tolerance through lineage information, allowing for data recovery in case of failures.

11.2 Distinguish between Base RDD and Pair RDD

11.3 Distinguish between transformations and actions

Transformations are operations that create a new RDD from an existing one. They are performed lazily, meaning they are only executed when an action is called. Examples of transformations include map, filter, and reduceByKey.

Actions are operations that return a value to the driver program or write data to external storage systems. Actions trigger the execution of transformations. Examples of actions include count, collect, and saveAsTextFile.

11.4 Explain available transformations

11.5 Explain available actions

11.6 Name the main Spark runtime components

11.7 Explain how to manage parallelism in Spark

Parallelism in Spark can be managed by controlling the number of partitions and the number of cores used by each executor. You can set the default number of partitions when creating an RDD or by repartitioning an existing RDD. You can also configure the number of cores used by each executor in the Spark configuration.

11.8 Explain how recoverability works in Spark

Recoverability in Spark is achieved through RDD lineage information, which records the sequence of transformations used to create an RDD. If a partition of an RDD is lost due to a node failure, Spark can recompute the partition using the lineage information and the transformations applied to the original data. This allows Spark to recover lost data without the need for data replication, reducing overhead and improving fault tolerance.

Also, data can be cached/persisted in up to two nodes. As a rule of thumb, we should cache an RDD if it is parent of more than one RDD.

11.9 Distinguish between narrow and wide dependencies

11.10 Name the two mechanisms to share variables

11.11 Provide the Spark pseudo-code for a simple problem using RDDs

Problem: Word count using RDDs.

1from pyspark import SparkContext 
2 
3# Create a Spark context 
4sc = SparkContext("local", "WordCount") 
5 
6# Read the input text file 
7text_file = sc.textFile("input.txt") 
8 
9# Split each line into words, and map each word to a (word, 1) tuple 
10word_counts = (text_file.flatMap(lambda line: line.split(" ")) 
11                        .map(lambda word: (word, 1)) 
12                        .reduceByKey(lambda a, b: a + b)) 
13 
14# Save the word count results as a text file 
15word_counts.saveAsTextFile("output")

12 Stream Data Management

12.1 Define a data stream

A data stream is a dataset that is produced incrementally over time, instead of being fully available before its processing begins.

12.2 Distinguish the two kinds of stream management systems

12.3 Recognize the importance of stream management

Stream management is essential for handling continuous and dynamic data sources, enabling real-time decision making, and providing insights into the behavior of the system or environment. It is crucial in many applications, such as monitoring and control systems, financial markets, network traffic analysis, social media analysis, and Internet of Things (IoT) environments.

12.4 Enumerate the most relevant chracteristics of streams

12.5 Explain to which extent a DBMS can manage streams

Using an RDBMS is possible in some exceptional cases in which some requirements can be relaxed.

Actually, active databases were the precursors of SPEs. Active Databases have the goal of automatically trigerring a response to monitored events such as database updates, points in time or events external to the database. The operations provided by these databases are ECA rules, usually implemented via triggers, with the main objective of maintaining integrity constraints and derived information.

However, they fall short for more complex aggregations oveer time. Moreover, ACID transactions encompass a large overhead on data ingestion, hindering the capacity of processing large data streams arriving at a very high pace.

If the arrival rate is not very high, we can use temporary tables, available in many RDBMSs, whose operations are much faster, since they are single user and kept in memory. This allows to keep a tuple for every message in the stream, making the table analogous to a sliding window over the stream.

12.6 Name 10 differences between DBMS and SPE




DBMS SPE






Data Persistent Volatile



Access Random Sequential



Queries One-time Continuous



Support Unlimited disk Limited RAM



Order Current state Sorted



Ingestion rate Relatively low Extremely high



Temporal requirements Little Near-real-time



Accuracy Exact data Imprecise data



Heterogeneity Structured data Imperfections



Algorithms Multiple passes One pass



12.7 Characterize the kinds of queries in an SPE

We can classify stream operations attending to three independent criteria:

The two most characteristics queries in stream processing are:

12.8 Explain the two parameters of a sliding window

The window size is the interval of time in seconds for how much historical data shall be contained in RDD before processing. The sliding interval is the amount of time in seconds for how much the window will shift.

In more plain terms: the sliding interval indicates when we process, and the window size indicates how much time back we take into account. Note that if the interval is bigger than the size, we would be letting some data go without processing it. This info is exemplified in Figure 10. In the image, the second windowed stream is done with a tumbling window.


PIC

Figure 10: Window parameters visualization.


12.9 Explain the three stream architectural patterns

12.10 Explain the goals of Spark streaming architecture

Spark Streaming is an extension of the core Spark API, designed to handle near-real-time data processing by dividing incoming data streams into small batches and processing them using Spark’s core engine. The goals of Spark Streaming architecture are as follows:

12.11 Draw the architecture of Spark streaming

12.12 Identify the need of a stream ingestion pattern

The need for a stream ingestion pattern arises when you have to:

  1. Capture and store high-velocity data streams from various sources.

  2. Ensure data durability and availability for processing.

  3. Handle backpressure and avoid data loss due to sudden spikes in data rates.

  4. Manage data partitioning and replication for fault tolerance.

12.13 Identify the need of a near-real time processing pattern

The need for a near-real-time processing pattern arises when you have to:

  1. Process data as it arrives, providing quick insights and enabling real-time decision-making.

  2. Respond to events or anomalies in the data immediately.

  3. Continuously update the application state based on incoming data.

  4. Handle large-scale data streams with low latency requirements.

12.14 Identify the kind of message exchange pattern

The kind of message exchange pattern depends on the specific use case and system requirements. Common patterns include:

  1. Publish-subscribe: Producers publish messages to a shared topic, and consumers subscribe to topics to receive messages.

  2. Point-to-point: Producers send messages directly to specific consumers, with each message consumed by a single consumer.

  3. Request-reply: A synchronous pattern where the sender expects a response for each sent message.

12.15 Simulate the mesh-join algorithm

12.16 Estimate the cost of the mesh-join algorithm

12.17 Use windowing transformations in Spark streaming

1from pyspark.streaming import StreamingContext 
2from pyspark import SparkContext 
3 
4# Initialize Spark context and streaming context with a batch interval of 1 second 
5sc = SparkContext("local[*]", "WindowedWordCount") 
6ssc = StreamingContext(sc, 1) 
7 
8# Set checkpoint directory for fault tolerance 
9ssc.checkpoint("checkpoint") 
10 
11# Read data from a text stream 
12lines = ssc.socketTextStream("localhost", 9999) 
13 
14# Split lines into words and create pairs (word, 1) 
15words = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)) 
16 
17# Reduce word pairs within a window of 10 seconds and sliding interval of 2 seconds 
18windowed_word_counts = words.reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, windowDuration=10, slideDuration=2) 
19 
20# Print the word counts within the window 
21windowed_word_counts.pprint() 
22 
23# Start the streaming context and wait for it to terminate 
24ssc.start() 
25ssc.awaitTermination()

In this example, we first create a DStream from a socket text stream. We then split each line into words and create key-value pairs for each word. The key is the word itself, and the value is 1, representing a single occurrence of the word.

We then use the reduceByKeyAndWindow function to calculate the word count within a sliding window. In this case, the window duration is set to 10 seconds, and the slide duration is set to 2 seconds. The reduceByKeyAndWindow function takes two lambda functions as arguments - the first one is used to add the values when a new RDD enters the window, and the second one is used to subtract the values when an RDD exits the window.

Finally, we print the word counts within the window using the pprint function and start the streaming context. The streaming context will run until terminated by the user or an error occurs.

13 Data Stream Analysis

13.1 Explain the difference between generic one-pass algorithms and stream processing

One-pass algorithms and stream processing are both designed to handle data that can’t be stored in its entirety due to its size. However, they differ in their approach and specific use cases. A one-pass algorithm, as the name suggests, processes the data in one pass, typically reading from a dataset that’s stored in a file or a database. It aims to make the best decision or computation at each step with the information available, without the ability to revisit the data.

Stream processing, on the other hand, is a type of computing that deals with continuously incoming data streams. It processes the data on-the-go, often in real-time or near-real-time, without waiting for all data to arrive or be stored. Stream processing can handle ’infinite’ data streams, which are never-ending and timely, while one-pass algorithms are generally used with ’finite’ data sets.

13.2 Name the two challenges of stream processing

  1. Limited computation capacity: Stream processing involves real-time or near-real-time processing of data. The rate at which the data arrives might exceed the processing capability of the system, causing data loss or delayed processing.

  2. Limited memory capacity: In stream processing, data is continuously arriving. The system might not have enough memory to store all arriving elements, especially if the data arrival rate is very high, posing a challenge to keep track of the entire history of the stream.

13.3 Name two solutions to limited processing capacity

  1. Probabilistically drop some stream elements through uniform random sampling on the stream. A widely known technique is Load shedding.

  2. Filter out some elements of the stream based on some characteristic of the data. For example, using Bloom filters.

13.4 Name three solutions to limited memory capacity

  1. Focus on only part of the stream, ignoring the rest, by means of a Sliding window.

  2. Weight the elements of the stream depending on their position, so that we can keep track of the aggregate without keeping each and every element. Moreover, if we are not interested in the elements not present in the stream recently, we can define a threshold to filter them out. This is done with an exponentially decaying window.

  3. Keeping a summary structure that allows to approximate the response to some queries. These structures are usually called Synopsis.

13.5 Decide the probability of keeping a new element or removing an old one from memory to keep equi-probability on load shedding

If there are p memory positions and we have seen n elements in the stream, then the probability of keeping the new element n + 1 is

        p
Prin = n+-1,

and if it is kept, then we remove elements from memory with probability

P rout = 1.
        p

13.6 Decide the parameters of the hash function to get a representative result on load shedding

We should take a hash function mapping to a large number of values, M, and keep only elements mapping to a value below t. We dynamically reduce t as we run out of memory.

13.7 Decide the optimum number of hash functions in a Bloom filter

If the filter has n bits and we expect to have m different key values, then the optimal amount of hash functions is

k = n-log 2.
    m

13.8 Approximate the probability of false positives in a Bloom filter

The probability of false positives is around

        (     km )k
P rFP =  1- e- n   ,

where n is the size of the filter, m is the number of keys and k is the number of hash functions. In the case of optimal k, it is

       (  )
         1  k       nm-
P rF P =  2   ~ 0.618  .

13.9 Calculate the weighted average of an attribute considering and exponentially decaying function

         T+1               ( T            )
         ∑          T+1-i   ∑          T-i
AvgT+1 =    ai(1- c)     =      ai(1 - c)     ⋅(1- c)+ aT+1 = AvgT ⋅(1 - c)+ aT+1,
         i=0                 i=0

where c is a small constant, T is the current time, at is the element at time t or 0 if there is no element, and g(T - t) = (1- c)T-t is the weight at time T of an item obtained at time t.

13.10 Decide if heavy hitters will show false positives

It is possible to show false positives, depending on the stream.

14 Big Data Architectures

14.1 Explain the problem of a spaguetti architecture

In Big Data contexts, it is common to find different requirements for different tasks, as well as different tools that provide means to perform these tasks, with different capabilities. Therefore, we can have different tasks spread over different independent and heterogeneous systems. For example, just looking at Hadoop, we find HDFS and HBase for storage, HCatalog for modeling, Sqoop for ingestion, Spark for processing and SparQL for querying.

The spaguetti architecture arises when we solve our tasks’ requirements one by one, finding what we consider the best option for that specific task at each moment. This implies that we will end up using tens of different tools, making integration more costly, creating artificially complex pipelines and difficulting the scalability of the system as a whole. Therefore, architectural designs arise to avoid this problem.

14.2 Explain the need of the Lambda Architecture

The Lambda Architecture addresses the problem of computing arbitrary functions on arbitrary data in real-time by combining both batch and stream processing methods. This makes it possible to handle both low-latency real-time data and keep the benefits of batch processing for more complex analytics.

Batch layer stores all incoming data and allows for computation over large data sets, while the speed layer deals with recent data to provide real-time views. This allows the system to provide precomputed views from the batch layer and recent data views from the speed layer, meeting the demands of both latency and accuracy.

14.3 Explain the difference between Kappa and Lambda Architectures

The Lambda Architecture includes two layers, the batch and the speed layer, while the Kappa Architecture only uses a streaming layer. Lambda uses batch processing for handling large data sets and stream processing for real-time data, while Kappa uses stream processing for all data. Kappa Architecture is often simpler because it only requires maintaining one system, but it may not be suitable for all kinds of computations, especially for those that are naturally easier to express as batch computations.

14.4 Justify the need of a Data Lake

The main idea of a Data Lake is Load-first,Model-later. In traditional analytical frameworks, the approach is to model the schema beforehand, in a Model-first,Load-later manner. This approach restricts the potential analysis that could be done in the data, since all information that is not compliant with the defined schema is lost. Therefore, the idea is to store the raw data and then create on-demand views of this raw data to handle all analysis that we might need in the future. The problems of this approach, usual in traditional Data Warehouses systems are that the transformations become permanent, the schema is fixed and that the users of the data need to comply with the schema as well, imposing them to learn the schema, what can be done, and what cannot (high entry barriers).

14.5 Identify the difficulties of a Data Lake

If we don’t organize properly the Data Lake, then it could become really hard to track all the data that is stored in it, effectively transforming the Data Lake into what is called a Data Swamp. The second problem is that, since each required analysis will need specific processing, it is easy to end up having lots of different transformation pipelines, which are very case-specific and can be hard to reuse.

14.6 Explain the need of each component in the Bolster Architecture

The Bolster Architecture is a reference architecture for Big Data systems. It was designed to facilitate the design of Big Data systems in multiple organizations and performed by a research-oriented team. The architecture is based on two families of Big Data architectures and leverages the Big Data dimensions (the five "V’s") and the requirements defined for each of them.

The Bolster Architecture consists of several components, each with a specific function:

The Bolster Architecture addresses the two main drawbacks identified in the λ-architecture: it considers Variety, Variability, and Veracity as first-class citizens and refines the λ-architecture to facilitate its instantiation. These changes boil down to a precise definition of the components and their interconnections.

14.7 Map the components of the Bolster Architecture to the RDBMS Architecture

The Semantic Layer in the Bolster Architecture, which contains the Metadata Management System (MDM), can be compared to the system catalog in an RDBMS. The MDM stores machine-readable semantic annotations, similar to how the system catalog in an RDBMS stores metadata about the database.

The Batch Layer, Speed Layer, and Serving Layer in the Bolster Architecture handle data storage and processing. These layers can be compared to the data storage and processing components of an RDBMS.

The Batch Layer stores a copy of the master data set in raw format as data are ingested and pre-computes Batch Views that are provided to the Serving Layer. This is similar to how an RDBMS stores and processes data.

The Speed Layer ingests and processes real-time data in the form of streams. Results are then stored, indexed, and published in Real-time Views. This can be compared to the real-time data processing capabilities of some RDBMSs.

The Serving Layer stores, indexes, and publishes data resulting from the Batch Layer processing in Batch Views. This is similar to how an RDBMS stores, indexes, and retrieves data.

However, it’s important to note that the Bolster Architecture and an RDBMS have different purposes and are designed to handle different types of data and workloads. The Bolster Architecture is designed for Big Data systems, which often involve processing large volumes of unstructured or semi-structured data, while an RDBMS is typically used for structured data and transactional workloads.

14.8 Given a use case, define its software architecture

15 In-Memory Columnar Databases (New Relational Architecture)

15.1 Justify the viability of in-memory databases

In-memory databases store data in the main memory (RAM) of a computer rather than on disk, which results in much faster access times and thus faster query processing. Here are some reasons justifying their viability:

However, it’s worth noting that in-memory databases also have their trade-offs. They can be more expensive due to the cost of RAM, and they can also face challenges with data persistence, as data in memory is volatile. But these challenges can often be overcome with techniques like database replication and persistent storage backups.

15.2 Explain the principles of NUMA architecture

NUMA (Non-Uniform Memory Access) is a computer memory design used in multiprocessing, where the memory access time depends on the memory location relative to the processor. Under NUMA, a processor can access its own local memory faster than non-local memory (memory local to another processor or memory shared between processors). This facilitates scaling up by growing a single machine. There are two main trade-offs:

Also, this architecture allows parallelism in the different cores and CPUs.

This architecture is also much faster and expensive than a cluster of shared-nothing machines, but it’s clear that they are also much harder to manage if we want to push it to its full potential. The principles of NUMA are:

NUMA architectures are used in systems with large numbers of processors, providing benefits over traditional symmetric multiprocessing (SMP) systems. However, to get the best performance from a NUMA system, software (like a database management system) should be NUMA-aware and designed to minimize non-local memory accesses.

15.3 Enumerate 3 techniques to optimize cache usage

  1. Locality awareness: aims at reducing the number of idle CPU cycles waiting for the arrival of data. The system can benefit from spacial locality, i.e., there is a high chance of accessing data contiguous to data already accessed; and temporal locality, i.e., there is a high chance of accessing data that has been recently accessed.

  2. Flexible caching: has the purpose of bringing and keeping as much relevant data as possible in cache. It is related to associativity which describes how different blocks of memory map to the same cache location. A fully associative cache allows a memory block to be placed in any cache location. This flexibility allows the cache to store more relevant data, but it comes at the cost of increased complexity and slower lookup times. Therefore: low associativity facilitates searching, since a disk block can only be found in few places, but complicates replacement. High associativity facilitates replacement, but makes it more difficult for searching.

  3. Cache-concious design: this is relevant for DBMS developers, that need to be aware of cache line size, to only use multiple of that size in the code, optimizing access to data.

15.4 Give 4 arguments supporting columnar storage

15.5 Explain 3 classical optimization techniques in RDBMS related to column storage

15.6 Sketch the functional architecture of SanssouciuDB

15.7 Explain 4 optimizations implemented in SanssouciuDB to improve data access

15.8 Explain how to choose the best layout

We should choose columnar storage when:

We should choose row storage when:

15.9 Identify the difference between column-stores and NOSQL related to transactions

In column-stores, we avoid replication management by only storing atomic data, relying on the performance obtained in aggregate queries. With on-the-fly aggregation, the aggregate values are always up-to-date. Also, we reach this way higher concurrency, since we can aggregate different measures concurrently for different columns. We can still replicate some small and static tables. Since these systems provide full ACID support, the consistency is strong, in a eager/secondary-copy synchronization mechanism scheme.

15.10 Explain 3 difficulties of pipelining in column-stores

  1. Short process trees: In column-stores, operations are often applied to each column independently, which can result in a shallow process tree (with few levels of nested operations). This can limit the opportunities for pipelining, as each operation may complete quickly and there may be fewer opportunities to overlap operations.

  2. Some operators need all input data at once: Certain operators, like sort or aggregate, need to process the entire input column before they can produce any output. This can make it more difficult to pipeline operations, as you have to wait for these operators to complete before passing their output to the next operator in the pipeline.

  3. Skewed cost of operations: In column-stores, the cost of operations can vary significantly depending on the specific column and operation. For example, operations on a column with a high cardinality (many unique values) or a large amount of data may be more expensive than operations on a column with low cardinality or less data. This skew can make it harder to evenly distribute work across a pipeline and maintain high throughput.

15.11 Explain 3 problems to implement parallelism in in-memory column-stores

  1. High startup cost: Parallelism involves dividing a task among multiple processing units. However, there is a startup cost associated with setting up these parallel tasks, including the time to divide the task and synchronize the results. If the tasks are too small or the number of tasks is too high, the overhead of managing these tasks can outweigh the benefits of parallel execution.

  2. Contention (at harware level): Parallel tasks often need to access shared resources, such as memory or input/output devices. If multiple tasks try to access these resources at the same time, they can interfere with each other, leading to contention. This can degrade performance and limit the effectiveness of parallel execution.

  3. Skewness: refers to a situation where the data or the workload is not evenly distributed across the tasks. For example, if one task gets assigned a larger portion of the data or a more complex operation, it can take longer to complete than the other tasks. This can cause the other tasks to sit idle while waiting for the skewed task to complete, reducing the overall efficiency of parallel execution.

15.12 Explain 5 query optimization techniques specific to columnar storage

15.13 Given a data setting, justify the choice of either row or column storage

15.14 Given the data in a column, use run-length encoding with dictionary to compress it

References

[1]   Alberto Abelló and Sergi Nadal. Big data management. Lecture Notes.

[2]   Serge Abiteboul, Ioana Manolescu, Philippe Rigaux, Marie-Christine Rousset, and Pierre Senellart. Web Data Management. Cambridge University Press, 2011.

[3]   Eric A Brewer. Towards robust distributed systems. In PODC, volume 7, pages 343477–343502. Portland, OR, 2000.