Developing Frogtek

El blog del Departamento de Tecnología

Categoría: python (página 2 de 2)

Google App Engine: Lanzar un MapReduce desde un cron

Con las antiguas limitaciones del GAE, la ejecución de un proceso batch (generación de informes, actualización de una columna en todas las entidades de un tipo…) requería montar complejos sistemas diviendo nuestro proceso en pequeñas partes que serían procesadas por tareas enlazadas.

Desde hace poco (SDK 1.4.0.) el límite de tiempo para las peticiones en segundo plano (tareas y cron) aumentó:  ¡de 30 segundos a 10 minutos! , por lo que,  la mayoría de los procesos antes mencionados podrían ejecutarse sin mayor problema.

Sin embargo, cuando lo que necesitamos es recorrer todas las entidades de un modelo y actualizarlas/operar con ellas, mapreduce nos aporta algunas ventajas:

  • Comodidad a la hora de definir un nuevo script (mapreduce handler).
  • Eficiencia: mapreduce se ejecuta atutomáticamente con un número configurable de shards.
  • Escalabilidad infinita (aquí no tenemos la limitación de 10 minutos en caso de tener millones de entidades).

El problema es que los mapreduce están pensados para lanzarse manualmente desde la consola de administración de mapreduce (url_de_mi_aplicacion/mapreduce) pero en ocasiones nos gustaría poder lanzarlos programáticamente. Estudiando un poco el funcionamiento de esta consola y el código de mapreduce es sencillo modificarlo para conseguir nuestro propósito. Por ejemplo, lanzar un mapreduce desde un cron que se ejecute todos los lunes a la misma hora.

Lo primero es definir la entrada en nuestro cron.yaml, por ejemplo:

cron:
- description: purge tokens
 url: /admin/cron/execute_mapreduce
 schedule: every monday 00:00
 timezone: Europe/Madrid

Nosotros usamos Django, así que definiríamos una entrada en urls.py para mapear la url definida en el cron con nuestro método python encargado de llamar al mapreduce:

(r'^admin/cron/execute_mapreduce$', 'ruta_del_modulo.run_map_reduce'),

Y ahora viene la duda. ¿Cómo llamamos al mapreduce desde dicho método si sólo se puede a través de la consola? Esstudiando el comportamiento de la consola con una herramienta tipo firebug, vemos que cuando se pulsa en el botón “run” se ejecuta una llamada AJAX con una serie de parámetros. Si replicamos dicha llamada en nuestro código ya tenemos el mismo comportamiento.

Si  la definición de nuestro mapreduce (en el mapreduce.yaml) es ésta:

- name: execute_something_from_cron
 mapper:
 input_reader: mapreduce.input_readers.DatastoreInputReader
 handler: map_reduce_handlers.executed_from_cron.execute_something_from_cron
 params:
 - name: entity_kind
 default: points_of_sale.models.pos_model.PointOfSale

Entonces la llamada por código (en localhost) sería esta:

 form_fields = {
 "mapper_handler": "map_reduce_handlers.executed_from_cron.execute_something_from_cron",
 "mapper_input_reader": "mapreduce.input_readers.DatastoreInputReader",
 "mapper_params.entity_kind": "points_of_sale.models.pos_model.PointOfSale",
 "name": "calculate_aggregates_of_aggregates"
 }

 form_data = urllib.urlencode(form_fields)
 result = urlfetch.fetch(url="http://localhost:8080/mapreduce/command/start_job",
 payload=form_data,
 method=urlfetch.POST,
 headers={'Content-Type': 'application/x-www-form-urlencoded'},
 deadline=10)

Por último, para probar que todo funciona ya sólo queda ejecutar el cron. En local se puede hacer a través de la url:

http://localhost:8080/_ah/admin/cron

Pero cuidado, para poder ejecutarlo en local necesitamos un pequeño cambio. Lo normal es que mapreduce requiera permisos de administrador, lo cual, se indica en app.yaml:

- url: /mapreduce(/.*)?
 script: mapreduce/main.py
 login: admin  

Si ejecutamos el cron en la nube no hay problema porque tiene permisos de admin, sin embargo, el test local se ejecuta con permisos de usuario y, por ello, es necesario comentar la línea “login: admin” para las pruebas locales.

Ya parece que tenemos todos lo necesario, así que, lanzamos el test del cron desde el nagegador…obteniendo el siguiente resultado:

ApplicationError: 2 timed out

¿Por qué ocurre esto? La explicación se encuentra en el fichero base_handler.py de la libería mapreduce. Concretamente en el método _handle_wrapper:

def _handle_wrapper(self):
 if self.request.headers.get("X-Requested-With") != "XMLHttpRequest":
 logging.error(self.request.headers)
 logging.error("Got JSON request with no X-Requested-With header")
 self.response.set_status(
 403, message="Got JSON request with no X-Requested-With header")
 return

Como vemos, se están vetando explícitamente aquellas peticiones que no sean AJAX.

Afortunadamente, existe una solución muy fácil que evita cambiar el código de la librería mapreduce (sería peligroso porque en cualquier momento puede cambiar de versión o incorporarse por defecto sin necesidad de incluirla en el despliegue): se trata de añadir la cabecera XMLHttpRequest a nuestra petición:

headers={'Content-Type': 'application/x-www-form-urlencoded',
 'X-Requested-With': "XMLHttpRequest"},

Y ahora ya podemos ejecutar el mapreduce desde el cron y consultar la consola/logs cuando queramos ver su resultado. También se podrían lanzar peticiones (al igual que lo hace la consola) para comprobar que todo ha ido bien por código, pero eso ya es otra historia…

Utilizando gzip en las comunicaciones

Como ya sabéis hace dos semanas estuvimos de ranatón. En mi equipo, uno de nuestros objetivos era optimizar las comunicaciones entre el servidor y el cliente. El objetivo era optimizar todo lo que no habíamos optimizado todavía, ya que, en Colombia (donde están la mayoría de nuestros usuarios), las tarifas de datos aún son muy restrictivas en cuanto a datos enviados/recibidos.

La comunicación actualmente se estaba realizando usando Json y en texto plano.

Tal como este:

{"D":1,"sale_code":"301","type":"sale",
"local_created":"2011-01-05 09:26:57.042","balance_sheet_code":"501",
"customer_code":"-1","products":{"generic1":{"Q":1,"P":300,"F":1}},
"final_price":300,"paid":1,"cash_operation_code":"401"}

En este ejemplo estabamos enviando una operación de tipo venta. Como primera optimización, decidimos acortar los nombres de las propiedades. De este modo nos quedó:

{"D":1,"SC":"301","T":"sale","LC":"2011-01-05 09:26:57.042",
"BS":"501","CC":"-1","PR":{"generic1":{"Q":1,"P":300,"F":1}},
"FP":300,"PD":1,"CO":"401"}

Así conseguimos reducir bastante el tráfico de datos, pero necesitabamos reducirlo todavía más, así que tomamos la decisión de utilizar la compresión gzip, y comprimir todo el texto enviado y recibido por el cliente. Y después de comprimirlo lo convertimos en una string con BASE64, de este modo formateabamos los datos sin retornos de carro y todo quedaba, si cabe, mas uniforme:

H4sIAAAAAAAAADWOzQoCMQyEX0Xm3JW03SrmaMWT
aHU9KR5EiwiyygpeSt/dtCIE8s3kN+H5AuMwrK3toXCNH5GkBT
fgY8ICrBU6L66t9l7ofX5EwVVxDWndkIQb0YzNhN10TK2R8ryT
sqtDvnQ2hcIOnHCLfRzuF114Wy8EsCVSWIrKWfLfCL8PvLyDVpb
lU/4C8o/at7UAAAA=

No es el caso más optimo para comprimir, pero hemos calculado que enviamos un 50% menos de datos al enviar la información comprimida. Para poder medir esto utilizamos WireShark con un filtro tal como este (Gracias Jose):

(ip.src == 192.168.1.9  &&
           ip.dst == 192.168.1.12 &&
           tcp.port == 8080)

Para poder realizar la compresión en el cliente Android utilizamos la clase GZIPOutputStream de este modo:

public static byte[] zipStringToBytes( String input  ) throws IOException
  {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    BufferedOutputStream bufos = new BufferedOutputStream(new GZIPOutputStream(bos));
    bufos.write( input.getBytes() );
    bufos.close();
    byte[] retval= bos.toByteArray();
    bos.close();
    return retval;
  }

Y en la parte del servidor, nosotros utilizamos Google App Engine:

def decode_client_data(request_object):
    if(request_object.META.get('HTTP_CONTENT_ENCODING',None)!=None):
        gzip_string = base64.standard_b64decode(str(request_object.raw_post_data))
        url_file_handle=StringIO(gzip_string)
        gzip_file_handle = gzip.GzipFile(fileobj=url_file_handle)
        decompressed_data = gzip_file_handle.read()
        gzip_file_handle.close()
        return decompressed_data
    else:
        return request_object.raw_post_data

Gracias a la integración continua podemos asegurarnos de que ningún cliente trabajará contra una servidor que no acepte gzip, pero para evitarnos problemas y poder sacar versiones a producción del servidor sin tener que hacerlo del cliente, decidimos programar el servidor para que acepte operaciones sin comprimir y en versión de nombres de propiedades largas.

Para que el servidor nos envie la información comprimida en gzip es mucho más fácil, puesto que lo hace de manera automática siempre y cuando en las peticiones le digamos que nuestro cliente acepta compresión gzip. Nosotros lo hemos hecho de este modo:

HttpPost httppost = connectPOST(url);
   httppost.setHeader("Accept-Encoding", "gzip");
   httppost.setHeader("Content-Encoding", "GZIP");
   httppost.setHeader("User-Agent", "gzip");

Hay que tener muy presente que el gzip automático por parte de GAE no funciona en local.

Si tenéis alguna duda o sugerencia no dudéis en comentarla.

Segunda frogtek code kata

Como ya sabréis en Frogtek solemos realizar un TPV cada dos semana. En esta ocasión hemos decidido realizar nuestra segunda code kata con un ejercicio llamado Backwards Talk y que consiste en invertir un string. Por cambiar un poco con respecto a la primera sesión, hemos optado por realizarla en Python, salvo Guillermo que se ha atrevido con Javascript.

La solución de Javi Martínez nos ha parecido una de las más acertadas, haciendo uso de recursividad. En Frogtek solemos decir que los programadores que utilizan recursividad deberían llevar pajarita, ¿veremos mañana a Martínez con una?

def reverseString(str):
    if len(str) <= 0:
        return ''
    else:
        return reverseString(str[1:]) + str[0]

¿Comentarios? ¿Mejoras?

Automatización

Gracias a la costumbre en Frogtek de fomentar la formación a través de la compra libros, he podido leer “The Pragmatic Programmer” magnífico libro recomendable para cualquier programdor con ganas de mejorar.

En el mismo dan un par de consejos que me han parecido muy interesantes y que aparte de estar muy ligados, creo que seguimos bastante aquí en Frogtek. El primero:

Use the power of command shells

Indica que debemos usar los comandos shell como herramientas para facilitar nuestro trabajo. Por ejemplo en mi caso utilizo scripts shell para eliminar nuestro software del emulador y cargar la última versión de nuestro servidor, consiguiendo con un solo comando algo que si hiciera a través de la interfaz me llevaría el doble o el triple de tiempo. El segundo:

Don’t use manual procedures

Nos dice que no hagamos de forma manual nada que pueda hacer un script o programa. Será más propenso a errores y dará más quebraderos de cabeza a la hora de ser repetido en diferentes entornos. En este caso el ejemplo es nuestro proceso de paso a producción a través de Hudson, la compilación de todos los proyectos y paso de test, etc.

En resumidas cuentas automatización, todo lo que sea susceptible de ser automático debería serlo. Será más rápido, más confiable y menos propenso a errores. No tengas miedo al coste de configuración, si lo vas ha hacer más de dos veces, vale la pena automatizar. Vamos o eso nos dice la experiencia en Frogtek …

Eficiencia en Google App Engine: Appstats

Nuestra elección a la hora de subir datos a la nube fue Google App Engine, el servicio que permite ejecutar aplicaciones web en la infraestructura de Google. El desarrollo  en esta plataforma presenta ciertas características que obligan a cambiar el chip desde un primer momento y que nos fuerzan implícitamente a tener en cuenta la eficiencia y, sobretodo, la escalabilidad. Dicho esto, siempre hay momentos a lo largo del camino en los que uno se plantea revisar la eficiencia global de la aplicación para mejorar su tiempo de respuesta, reducir los consumos de cpu y, de paso, reducir la factura cobrada por Google.

La primera vez que nos enfrentamos a esta situación decidimos usar cProfile y pstats para estudiar los tiempos de nuestro código Python centrándonos en aspectos clásicos como optimización de búsquedas, mejora de la eficiencia en la manera de concatenar cadenas, reducción del número de iteraciones en un bucle…la verdad es que de poco nos sirvió, ya que, veíamos que la mayor parte del tiempo de proceso se gastaba en algo que en el profile se indicaba con líneas como esta:

_apphosting_runtime___python__apiproxy.Wait

Es decir, que la mayor parte del tiempo nuestra aplicación estaba esperando a que se completase el proceso realizado por una llamada a alguna API de Google.

Estudiando en detalle el lugar del código en el que aparecen estas líneas, se puede averiguar a qué API se están refiriendo pero, por suerte, todo ese trabajo dejó de ser necesario gracias al descubrimiento de Appstats:

Appstats es una herramienta creada por  Guido van Rossum (creador de Python y actual empleado de Google) que forma parte del SDK y nos permite ver en detalle en qué emplea realmente el tiempo nuestra aplicación mediante el estudio de las RPC, es decir, las llamadas remotas que nuestra aplicación hace a las distintas API’s de Google. Su instalación es muy sencilla y podemos instalarla incluso en producción porque consume muy pocos recursos.

En  el siguiente video, el propio Guido, nos explica las bondades de esta herramienta indispensable:

En el video se muestra cómo instalar y usar la herramienta, una breve explicación de su funcionamiento y consumo, posibilidades de configuración y algunos ejemplos de uso como, por ejemplo, detección de ciertos patrones de ineficiencia comunes a muchas aplicaciones : Patterns of Doom!

Recientes entradas