MR

Google Building Blocks Review Flashcards

Google Specialized Software Systems

Google has developed several major software systems for internal processing:

  • MapReduce: A simplified way to write and run large-scale jobs on clusters of machines.
    • Used for generating production index data quickly and performing ad-hoc experiments rapidly.
    • Dean & Ghemawat, MapReduce: Simplified Data Processing on Large Clusters, OSDI, 2004.
  • GFS (Google File System): A large-scale distributed file system.
    • Ghemawat, Gobioff, & Leung, Google File System, SOSP 2003.
  • BigTable: A semi-structured storage system.
    • Provides online, efficient access to per-document information at any time.
    • Allows multiple processes to update per-document information asynchronously.
    • Critical for updating documents in minutes instead of hours.
    • Chang, Dean, Ghemawat, Hsieh, Wallach, Burrows, Chandra, Fikes, & Gruber, Bigtable: A Distributed Storage System for Structured Data, OSDI 2006.

Introduction to MapReduce

  • MapReduce is a methodology for exploiting parallelism in computing clouds (racks of interconnected processors).
  • It is a common way to analyze very large amounts of data.
  • Initially developed at Google.
  • In 2004, Google was using MapReduce to process 100TB/day of data.

How is MapReduce Used by Search Engines?

  • At Google:
    • Building Google's Search Index
    • Article clustering for Google News
    • Statistical machine translation
  • At Yahoo!:
    • Building Yahoo!'s Search Index
    • Spam detection for Yahoo! Mail
  • At Facebook:
    • Data mining
    • Ad optimization
    • Spam detection

Motivation Beyond Search Engines

  • Modern Internet applications require managing immense amounts of data quickly.
  • Many applications involve extremely regular data with ample opportunity to exploit parallelism.
  • Examples:
    1. Dish Network: Collecting every click of the remote.
      • Dish Network supplies TV reception via satellite and collects data on set-top boxes.
    2. Tesla: Collecting every usage of the car.
      • Tesla cars are connected to the cellular network and report all actions back to Tesla.

Why Parallelization is Hard

  • Parallelization is "easy" if processing can be cleanly split into n units.
  • There should also be an easy way to combine the outputs.
  • Complicated issues to deal with:
    • What if there are more work units than threads?
    • How to assign work units to worker threads?
    • How to aggregate/combine the results at the end?
    • How to know when all the workers have finished?
    • What if the work cannot be divided into completely separate tasks?
  • MapReduce solves these problems, so the programmer doesn't have to deal with them.

Programming with Multiple Threads Poses Challenges

  • Example:

    Thread 1:
    void foo() {
        x++;
        y = x;
    }
    
    Thread 2:
    void bar() {
        y++;
        x++;
    }
    
  • If the initial state is x = 6, y = 0, the final values of x and y after the threads finish running could be (8, 8) or (8, 7).

Multithreaded = Unpredictability

  • Many operations that look like "one step" actually take several steps under the hood (assembly code level).
  • When running a multithreaded program, the order of thread execution and interruptions are unknown.

The "corrected" example

  • Using a semaphore to guard access to x and y:

    Thread 1:
    void foo() {
        sem.lock();
        x++;
        y = x;
        sem.unlock();
    }
    
    Thread 2:
    void bar() {
        sem.lock();
        y++;
        x++;
        sem.unlock();
    }
    
  • Semaphore is an integer variable shared between threads, protecting the "critical section" from simultaneous access.

Processing Across a Machine Cluster Introduces Unpredictability on Many Levels

  • Synchronization problems apply beyond low-level operations within a critical section.
  • Other issues include:
    • Pulling work units from a queue
    • Assigning work units to an available thread
    • Work units reporting back to the master unit
    • Informing another thread that it can begin the "next phase" of processing
  • All require synchronization!

How MapReduce Solves the Parallelization Problems

  • MapReduce provides:
    • Automatic parallelization of code across multiple threads and processors
    • Fault tolerance in case of node failures
    • I/O scheduling
    • Monitoring & Status updates

The Map/Reduce Paradigm

  1. A large number of records are broken into segments
  2. Map: extracts something of interest from each segment
  3. Group: sorts the intermediate results from each segment (sometimes called shuffle)
  4. Reduce: aggregates intermediate results
  5. Generate final output
  • Key idea: Re-phrase problems, so the input can be divided into parts and operated on in parallel; the results are combined to produce a solution to the original problem.

The Map & Reduce Routines

  • Using map-reduce, one must write 2 functions called Map and Reduce.
  • The system manages the parallel execution and coordination of tasks automatically.
  • A map-reduce computation proceeds as follows:
    1. Some number of map tasks is each given one or more chunks to process.
    2. These map tasks turn the chunk into a sequence of key-value pairs; how the pairs are produced depends upon the code for the Map function.
    3. Key-value pairs from each Map task are collected by a master controller and sorted by key; keys are divided among all the Reduce tasks, so all key-value pairs with the same key wind up at the same Reduce task.
    4. Reduce tasks work on one key at a time and combine all the values associated with that key in some way; the manner of combination depends upon the Reduce code.

A MapReduce Example - Counting Word Occurrences

  • Counting the number of occurrences for each word in a collection of documents.
  • The input file is a repository of documents.
  • Each document is an element passed to a separate processor.
  • The Map function:
    • Parses the document and extracts each word, using each word as a key of type String (W1, W2, …).
    • For each word, it assigns an integer value of 1.
    • Each processor outputs key-value pairs where the key is a word, and the value is always 1, namely (w1, 1), (w2, 1), …, (w_n, 1).
    • If a word w appears n times in a single document, then there will be n key-value pairs (w, 1) in the output of the processor handling that document.
    • If a word w appears m times among all documents, then there will be m key-value pairs (w, 1) in the output.

Count Word Occurrences Pseudo-Code

Map(String input_key, String input_value):
    // input_key: document name
    // input_value: document contents
    for each word w in input_value:
        EmitIntermediate(w, "1");

reduce(String output_key, Iterator intermediate_values):
    // output_key: a word
    // output_values: a list of counts
    int result = 0;
    for each v in intermediate_values:
        result += parseInt(v);
    Emit(AsString(result));

Distributed Execution Overview

  • The user program forks a Master controller process and some number of Worker processes at different compute nodes.
  • A Worker handles either Map tasks or Reduce tasks, but not both.

Master:

  • Creates some number of Map and Reduce tasks
  • Assigns tasks to Worker processes
  • Typically, there is one Map task for every chunk of the input.
  • Keeps track of the status of each Map and Reduce task (states are: idle, executing on a Worker, completed).
  • Each Map task is assigned one or more chunks of the input file(s) and executes the code.
  • The Map task creates a file for each Reduce task on the local disk of the Worker that executes the Map task.
  • The Master is told of the location and sizes of each of these files and the Reduce task for which each is destined.

Looking Under the Hood at the Reduce Task

  • The master controller process knows how many Reduce tasks there will be, say r.
    • the user defines r
  • The master controller picks a hash function that applies to keys and produces a bucket number from 0 to r-1.
  • Each key output by a Map task is hashed, and its key-value pair is put in one of r local files.
    • Each file will be processed by a Reduce task.
  • After all Map tasks have completed successfully, the master controller merges the file from each Map task that are destined for a particular Reduce task and feeds the merged file to that process.
  • For each key k, the input to the Reduce task that handles key k is a pair (k, [v1, …, vn]), where (k, v1), (k, v2), …, (k, v_n) are all the key-value pairs with key k coming from all the Map tasks.

Explanation of the Reduce Task

  • The Reduce function is written to take pairs consisting of a key and a list of associated values and combines them in some way.
  • The Reduce function output is a sequence of key-value pairs consisting of each input key k paired with the combined value.
  • Outputs from all Reduce tasks are merged into a single file.
  • Reduce function adds up all the values and outputs a sequence of (w, m) pairs, where w is a word that appears at least once in the documents, and m is the total number of occurrences.
  • The Reduce function is generally associative and commutative, implying values can be combined in any order yielding the same result.

The Problems Google Tried to Solve with a New File System

Google needed a large-scale and high-performance unified storage system that:

  1. Be Global: Any client can access (read/write) any file, allowing data sharing among different applications.
  2. Support Automatic Sharding: Sharding of large files over multiple machines improves performance by allowing parallel processes on each file chunk and also deals with large files that cannot fit into a single disk.
  3. Support Automatic Recovery: Recovery from failures.
  4. Be Optimized for Sequential Access: Optimization for sequential access to huge files and for read and append operations, which are the most common.

The Google File System Design Assumptions

  • Files will be HUGE
    • Multi-gigabyte files are common
    • Multi-terabyte datasets
  • Most file modifications are appends (atomic)
    • Random writes practically non-existent
  • Once written… sequential reads
  • Caching not terribly important
  • There will be a single master server monitoring multiple chunk servers

Google File System (GFS) Top Level View

  • GFS is a proprietary distributed file system for efficient, reliable access to data using large clusters of commodity hardware.
  • Files are divided into fixed-size chunks of 64 megabytes, similar to clusters or sectors in regular file systems.
  • Files are usually appended to or read; they are only extremely rarely overwritten or shrunk.
  • GFS is designed for system-to-system interaction.
  • Chunk servers replicate the data automatically.

Master Server and Chunk Servers

Master Server:

  • Holds all metadata:
    • Namespace (directory hierarchy)
    • Access control information (per-file)
    • Mapping from files to chunks
    • Current locations of chunks (chunkservers)
  • Delegates consistency management
  • Garbage collects orphaned chunks
  • Migrates chunks between chunk servers

Chunk Server:

  • Stores 64 MB file chunks on local disk using the standard Linux filesystem, each with a version number and checksum
  • Read/write requests specify chunk handle and byte range
  • Chunks are replicated on a configurable number of chunkservers (default: 3)
  • No caching of file data (beyond standard Linux buffer cache)

Google File System (GFS)

  • Master manages metadata.
  • Files are broken into chunks (typically 64 MB).
  • Data transfers happen directly between clients/chunkservers.
  • Chunks are triplicated across three machines for safety.

GFS: Major Aspects

  • Append vs. Rewrite:
    • GFS is optimized for appended files rather than rewrites.
    • Clients within Google rarely need to overwrite files; they add data onto the end of files instead.
    • Overwriting data on a file in the GFS is possible but not handled very efficiently.
  • Which Replica Does GFS Use?
    • GFS separates replicas into two categories: primary replicas and secondary replicas.
    • A primary replica is the chunk that a chunkserver sends to a client.
    • Secondary replicas serve as backups on other chunkservers.
    • The master server decides which chunks will act as primary or secondary.
    • If the client makes changes to the data in the chunk, the master server lets the chunkservers with secondary replicas know they have to copy the new chunk off the primary chunkserver to stay current.
  • What About Big Files?
    • If a client creates a write request that affects multiple chunks of a particularly large file, the GFS breaks the overall write request up into an individual request for each chunk.
    • The rest of the process is the same as a normal write request.
  • Heartbeats and Handshakes
    • The GFS components give system updates through electronic messages called heartbeats and handshakes.
    • These short messages allow the master server to stay current with each chunkserver's status.

Google File System vs. BigTable

  • GFS provides raw data storage.
  • Google needs a system for handling:
    • Trillions of URLs
    • Geographic locations such as physical entities, roads, satellite image data, etc.
    • Per-user data for billions of people, including preference settings, recent queries, and searches.
  • It must be capable of storing semi-structured data.
  • Reliable, scalable, etc.
  • Bigtable is a compressed, high-performance, proprietary data storage system built on top of the Google File System.
  • It is used by a number of Google applications, such as web indexing, MapReduce, Google Maps, YouTube, and Gmail.

Big Table Data Model

  • Not a Full Relational Data Model
  • Provides a simple data model
  • Supports dynamic control over data layout
    • Allows clients to reason about the locality properties
  • A Table in Bigtable is a:
    • Sparse
    • Distributed
    • Persistent
    • Multidimensional
    • Sorted map

Bigtable Storage Model

  • Data is indexed using row and column names
  • Data is treated as uninterpreted strings
    • (row:string, column:string, time:int64) \rightarrow string
  • Rows
    • Data maintained in lexicographic order by row key
    • Tablet: rows with consecutive keys
    • Units of distribution and load balancing
  • Columns
    • Column families
  • Cells
  • Timestamps

Rows

  • Name is an arbitrary string
  • Access to data in a row is atomic
  • Row creation is implicit upon storing data
  • Rows ordered lexicographically
  • Rows close together lexicographically usually reside on one or a small number of machines
  • Each row/column intersection can contain multiple cells
    • Each cell contains a unique timestamped version of the data for that row and column
    • Storing multiple cells in a column provides a record of how the stored data has changed over time

Columns

  • Columns have a two-level name structure:
    • family:optional_qualifier
  • Column family
    • Unit of access control
    • Has associated type information
  • Qualifier gives unbounded columns
    • Additional level of indexing, if desired

Timestamps

  • Used to store different versions of data in a cell
    • New writes default to current time, but timestamps for writes can also be set explicitly by clients
  • Garbage Collection
    • Per-column-family settings to tell Bigtable to GC
      • "Only retain the most recent K values in a cell"
      • "Keep values until they are older than K seconds"
  • API: Create / delete tables and column families