segunda-feira, 30 de junho de 2008

Google Treasure Hunt 2 - MapReduce?

Para resolver o Google Treasure Hunt 2, decidi usar um velho programa para busca de arquivos implementado em C++ usando multi-threading. Eu mudei algumas partes do código, só para garantir que possa fazer a busca pelos 2 padrões especificados no problema, e escolhi usar o framework Qt (da Trolltech) que tem algumas funções muito legais para lidar com arquivos, e isso é realmente verdade – você não precisa de mais nada, a não ser as classes QFile e QDir e derivadas delas.

Se você gosta de programação distribuída, talvez você já tenha ouvido falar no modelo MapReduce, desenvolvido pela Google (www.google.com). De acordo com Jeffrey Dean e Sanjay Ghemawat, em um artigo (“Distributed Programming with MapReduce) escrito para o interessante livro “Beautiful Code” (O’Reilly, 2007), MapReduce was developed as a way of simplifying the development of large-scale computations at Google.” (tradução: “O MapReduce foi desenvolvido como uma forma de simplicar o desenvolvimento de processamento em larga-escala no Google”) Acredito que esse problema proposto no Google Treasure Hunt não seja realmente um “processamento em larga-escala”, mas eu gostei da forma como o MapReduce funciona, e decidi aplicar esse conceito ao meu programa.

Aqui está o problema número 2 do Google Treasure Hunt:

Here is a random zip archive for you to download:
GoogleTreasureHunt08_14338682343771558225.zip

Unzip the archive, then process the resulting files to obtain a numeric result. You'll be taking the sum of lines from files matching a certain description, and multiplying those sums together to obtain a final result. Note that files have many different extensions, like '.pdf' and '.js', but all are plain text files containing a small number of lines of text.

Sum of line 4 for all files with path or name containing BCD and ending in .js
Sum of line 5 for all files with path or name containing foo and ending in .xml
Hint: If the requested line does not exist, do not increment the sum.

Multiply all the above sums together and enter the product below.
(Note: Answer must be an exact, decimal representation of the number.)

Se você der uma olhada no código fonte, existem 2 threads; o primeiro (Mapper) obtêm a lista de arquivos com um padrão de nomes fornecido (no meu caso, nomes contendo “BCD” e com extensão “.js”, ou contendo “foo” e terminando com “.xml”); o segundo thread (Reducer) utiliza a lista com nomes de arquivos do thread Mapper e faz uma outra busca mais profunda dentro de cada um dos arquivos pré-selecionados (para obter os valores de números nas linhas de número “4”). O principal método do Mapper é o seguinte:

bool FileSearchMap::depthFirstTraversal(const QString& currentDir)

{

QDir dirP(currentDir);

QString temp;

QString fileName;

QStringList entryP = dirP.entryList();

QStringListIterator entryL(entryP);

while (entryL.hasNext())

{

producerMutex.lock();

if (myFileCount == 5)

{

bufferNotFull.wait(&producerMutex);

} // if - mutex and signal

producerMutex.unlock();

fileName.clear();

temp = entryL.next();

if ((temp != ".") && (temp != ".."))

{

fileName = currentDir;

fileName.append('\\');

fileName.append(temp);

if (isDirectory(fileName))

{

depthFirstTraversal(fileName);

}

else

{

if (isRegular(fileName))

{

unsigned int ftype = 0;

if ( ( fileName.endsWith(".js") ) &&

( fileName.contains("BCD") ) )

ftype = TXT_FILE_PATTERN;

else if ( ( fileName.endsWith(".xml") ) &&

( fileName.contains("foo") ) )

ftype = XML_FILE_PATTERN;

if ( ftype != 0 )

{

DirEntry dentry( fileName, ftype );

queueMutex.lock();

textFiles.push(dentry);

queueMutex.unlock();

producerMutex.lock();

++myFileCount;

producerMutex.unlock();

bufferNotEmpty.wakeAll();

}

}

} // if - isDirectory / isRegular

} // if

} // while

return true;

}

O método acima sera chamado exatamente uma vez, a partir do método run da minha implementação do QThread no Mapper. Colocando o método depthFirstTraversal para rodar, primeiramente ele obtêm todos os arquivos do diretóro, chamando o método entryList, da classe QDir. Então um iterator é instanciado sobre a lista de arquivos anteriormente obtida, e, enquanto estiver iterando sobre os arquivos, e antes que qualquer coisa eu faço uma chamada à função wait da estrutura QWaitCondition: a wait condition é um tipo de variável de condição usada para sincronizar threads, interrompendo-o no caso de ter obtido 5 arquivos (claro, pode ser qualquer número que você queira). Porque obter exatamente 5 arquivos e interromper a execução, esperando para que a estrutura QWaitCondition seja liberada? Isso é usado de forma a criar um buffer com capacidade para 5 arquivos (no máximo), a partir do qual o thread Reducer pode iterar sobre cada um dos arquivos listados pelo thread Mapper, abrir cada um desses arquivos, e procurar pelo critério de busca dado. Dessa forma, o thread Reducer pode lidar apenas com uma lista de 5 arquivos, esgotá-la, e depois permitir que o Mapper preencha com mais alguns novos arquivos encontrados. Essa é a primeira regra usada para sincronizar os 2 threads: Mapper, comece a cobrir o buffer com os nomes dos arquivos encontrados, até que ele tenha 5 elementos. Daí, espere até que o Reducer consuma alguns desses arquivos, pois a partir disso você pode obter mais alguns arquivos. Faça isso até o fim dos tempos”. Agora, o thread Reducer, obtendo todos os arquivos que o Mapper consiga procurar:

void KeySearchReduce::run()

{

DirEntry dirEntry;

int cnt = 0;

do

{

prod->producerMutex.lock();

if (prod->myFileCount == 0)

{

prod->bufferNotEmpty.wait(&prod->producerMutex);

if (prod->isSearchDown())

{

prod->producerMutex.unlock();

return;

}

}

prod->producerMutex.unlock();

prod->queueMutex.lock();

dirEntry = prod->getFirstTextFile();

prod->queueMutex.unlock();

QFile infile(dirEntry.getFileName());

if (!infile.open(QIODevice::ReadOnly | QIODevice::Text) )

continue;

QString line;

unsigned int i = 0;

QTextStream in(&infile);

if ( dirEntry.getFileType() ==

FileSearchMap::TXT_FILE_PATTERN ) {

while (!in.atEnd())

{

line = in.readLine();

++i;

if ( i == 4 ) {

lineP2Count += atol(line.toLocal8Bit().constData());

++foundCount;

break;

}

} // while

} else if ( dirEntry.getFileType() ==

FileSearchMap::XML_FILE_PATTERN ) {

while (!in.atEnd())

{

line = in.readLine();

++i;

if ( i == 5 ) {

lineP1Count += atol(line.toLocal8Bit().constData());

++foundCount;

break;

}

} // while

} // if - getFileType()

infile.close();

prod->producerMutex.lock();

prod->myFileCount--;

prod->producerMutex.unlock();

prod->bufferNotFull.wakeAll();

} while (!prod->isSearchDown());

}

O Reducer na primeira linha do comando “do” diz o seguinte: Ei Mapper, você tem algum arquivo para que eu possa procurar pelo informação numérica na linha X? Caso você não tenha arquivo algum, eu esperarei um pouco, até que você tenha algo para mim...” Então, para cada arquivo que o thread Reducer processa, ele avisará ao Mapper, de forma a que ele possa alimentar mais o buffer. No caso do Mapper estiver dormindo em um buffer cheio com 5 elementos, ele será acordado e então recomeçará a cobrir o buffer com arquivos – isso é feito com uma chamada a prod->bufferNotFull.wakeAll(), encontrado no fim do loop principal do Reducer. A chamada a wakeAll faz com que todos os threads que estejam dormindo sobre essa variável de condição acordem imediatamente (ou quase). Com isso, o Mapper pode estar dormindo na linha bufferNotFull.wait(&producerMutex), no início do loop de busca, e essa chamada à função wakeAll a partir do Reducer pode liberar o bufferNotFull (uma instância do QWaitCondition), permitindo ao Mapper reiniciar o processo de completar o seu buffer de 5 elementos de novo. Tudo o que o Mapper tem que fazer é cobrir o buffer com nomes de arquivos.

Nós estamos lidando aqui com recursos compartilhados, e os mutexes são de uso obrigatório (veja as referências às instâncias de QMutex no código fonte) uma vez que essa aplicação é multithreaded, e não queremos que ocorram condições de corrida.

Colocando tudo junto:

FileSearchMap* mapper = new FileSearchMap;

KeySearchReduce* reducer = new KeySearchReduce;

mapper->setDirectory( dir );

reducer->setMapper( mapper );

mapper->start();

reducer->start();

mapper->wait();

reducer->wait();

long long a1, a2;

cout << argv[1] << " contains " <<>getFoundCount()

<< " files that contains " <<>getLineCount()

<< " lines for the pattern 1 ( " << (a1 = reducer->getLineP1Count()) <<

" ) and pattern 2 ( " << (a2 = reducer->getLineP2Count()) << " )." << endl;

long long response = a1*a2;

cout << "Response: (" <<>")" << endl;

Aqui, a chamada getLineP1Count() retorna a soma de todos os numeros na linha 4, nos arquivos com o primeiro padrão; e getLineP2Count() faz o mesmo para o padrão de busca 2.

Claro, dá pra resolver o problema todo sem usar capacidade de multithread, mas aí não tem graça! :)


Marcadores: , , , ,

1 Comments:

At 4:17 PM, Blogger meu deixa ser eu said...

Boa Tarde,


Tenho uma oportunidade para programador de busca que creio que possa te interessar. Você poderia entrar em contato com nara.ramos@lbslocal.com ?

Abs, Nara

 

Postar um comentário

<< Home