jueves, 30 de junio de 2016

Actualizando owncloud 8.1.8 a 9.0.0

Aunque la relación de Owncloud con Debian no pasa por sus mejores momentos ([Pkg-owncloud-maintainers] About no more ownCloud in Debian) en realidad es posible seguir instalando al primero sobre el segundo.

Siempre es recomendable mantener un ritmo de actualizaciones sano, pero si no es así, parece que es posible actualizar gradualmente (Esto es de suma importancia) de la siguiente forma:

7.0.3-debian >> 7.0.14 >> 8.0.12 >> 8.1.7>> 8.2.4 >> 9.0.2
Según se recoge en Upgrading ownCloud on Debian Stable to official packages, Esta forma gradual de hacer las actualizaciones pueden ser molestas, pero lo contrario (7.0.14 a 8.1.7, por ejemplo) es un proceso temerario que no estará automatizado por las herramientas de owncloud.

Actualizé la instalación en producción desde el punto 8.1.7 (En este contexto, se refieren al release. Mi versión era la 8.1.8) al 8.2.4.
En esta versión hubo un molesto problema con la app Documentos, cuya solución descrita en ASSERTION FAILED: tried to unsubscribe unknown callback from event "input/compositionstart" era actualizar a 9.0.2. Comprobé que con actualizar a 9.0.0 era suficiente.

Los cambios en /etc/apt/sources.list.d/owncloud.list describen básicamente el proceso de actualización:
#deb http://download.opensuse.org/repositories/isv:ownCloud:community/Debian_7.0/ /

# Dejaron de usar el servicio de opensuse
#deb http://download.owncloud.org/download/repositories/8.2/Debian_7.0/ /

# Esta no sirve del todo. Hay un con el repositorio según parece
#deb http://download.owncloud.org/download/repositories/stable/Debian_7.0/ /

## Esta es totalmente estable, aunque no es precisamente la última versión pero si la primera del relase 9.0.0, me pareció buena idea llegar a este punto
deb http://download.owncloud.org/download/repositories/9.0.0/Debian_7.0/ /

Por último, queda señalar que el paquete para la versión 9.0 se ha cambiado a owncloud-files, y que el paquete owncloud-deps no existe para esta versión de Debian. Cuando se instale owncloud-files, apache, amavis y php van a irse por un rato (Se supone que todos serían reemplazados por owncloud-deps). Una vez instalado se instalan todos de nuevo y todo funciona de maravilla.
aptitude install owncloud-files
Se instalarán los siguiente paquetes NUEVOS:     
  owncloud-files{b} 
0 paquetes actualizados, 1 nuevos instalados, 0 para eliminar y 0 sin actualizar.
Necesito descargar 32,7 MB de ficheros. Después de desempaquetar se usarán 93,6 MB.
No se satisfacen las dependencias de los siguientes paquetes:
 owncloud-files : Entra en conflicto: owncloud (<= 8.99.99) pero está instalado 8.2.5-1.1.
                  Entra en conflicto: owncloud-config-apache (<= 8.99.99) pero está instalado 8.2.5-1.1.
                  Entra en conflicto: owncloud-server (<= 8.99.99) pero está instalado 8.2.5-1.1.
Las acciones siguientes resolverán estas dependencias

     Eliminar los paquetes siguientes:
1)     owncloud                       
2)     owncloud-config-apache         
3)     owncloud-server                



¿Acepta esta solución? [Y/n/q/?]y
Se instalarán los siguiente paquetes NUEVOS:
  owncloud-files 
Se ELIMINARÁN los siguientes paquetes:
  apache2{u} clamav{u} clamav-base{u} clamav-freshclam{u} libclamav7{u} libllvm3.0{u} libmcrypt4{u} libpq5{u} owncloud{a} owncloud-config-apache{a} owncloud-server{a} php-xml-parser{u} php5{u} php5-curl{u} php5-intl{u} php5-mcrypt{u} php5-mysql{u} php5-pgsql{u} php5-sqlite{u} 
0 paquetes actualizados, 1 nuevos instalados, 19 para eliminar y 0 sin actualizar.
Necesito descargar 32,7 MB de ficheros. Después de desempaquetar se liberarán 17,9 MB.
¿Quiere continuar? [Y/n/?] y
Y claro, backup. Hacer el backup de un sistema de backup es un poco gracioso pero así las cosas

jueves, 23 de junio de 2016

Sistema de voceo con Elastix

Los sistemas de voceo más comunes suelen estar basados en hardware y ser soluciones específicas, pero es posible configurar Asterisk (El que ya esta configurado por Elastix) para tener uno sustentado en nuestro servicio de VoIP.
Con esto, una tarjeta de sonido para el servidor es el único costo en hardware que vamos a tener.

Lo primero es configurar en /etc/asterisk/modules.conf la carga de los módulos que vamos a necesitar para que Asterisk sea capaz de usar el sistema de sonido del sistema. Buscamos las siguientes líneas para que queden de la siguiente forma:
;
; Load either OSS or ALSA, not both
; By default, load no console driver
;
noload => chan_alsa.so
load => chan_oss.so
Luego, habrá que configurar el fichero /etc/asterisk/oss.conf
con algunas configuraciones propias del módulo. Hay que revisar algo en el sistema si es que existe el dispositivo /dev/dsp o por el contrario será /dev/dsp1. El primero esta configurado por defecto, para el segundo (Y para otros, supongo) usamos la opción device
[general]
autoanswer=yes
context=from-internal
overridecontext=yes
extension=s
language=en
playbackonly=yes
device = /dev/dsp1
Por último, configuramos la extensión a usar en el fichero /etc/asterisk/extensions_custom.conf, que por lo demás es el lugar donde se configurar extensiones de este tipo
[voceo-neomano]
; Primero hay que ver como funciona sin el
exten => 1030,1,Dial(console/dsp,20,A(beep))
exten => 1030,1,Set(PITCH_SHIFT(both)=.15)
exten => 1030,n,Hangup()
A voceo-neomano será necesario agregarlo bajo [from-internal-custom] con include. Como ejemplo, esa sección queda de la siguiente forma:
[from-internal-custom]
exten => 1234,1,Playback(demo-congrats)         ; extensions can dial 1234
exten => 1234,2,Hangup()
exten => h,1,Hangup()
include => agentlogin
include => conferences
include => calendar-event
include => weather-wakeup
include => voceo-neomano

Fuentes:
Sistema de voceo anti-feedback de bajo costo para Elastix
Unable to re-open DSP device /dev/dsp

miércoles, 15 de junio de 2016

Apuntes sobre el error de Squid3 / SquidGuard en Debian Jessie

Como la lectura no es tan amena como quisiera, recomiendo que siga a la respuesta a este problema, que publiqué un tiempo después:
Apuntes sobre el error de Squid / SquidGuard en Debian: Solucionado 

La configuración mínima necesaria para Squid3 parece ir de la siguiente forma:
acl usuarios src 10.40.20.0/24
acl usuarios src 10.20.20.0/24


acl Safe_ports port 80 443 8080 20 21
## Según https://forums.gentoo.org/viewtopic-t-952948-start-0.html
## Hay que comentar esto en Squid 3.4 porque ya esta configurado por defecto
# acl manager proto cache_object
acl CONNECT method CONNECT
acl NONE method NONE

ftp_passive off

host_verify_strict on 

http_access deny NONE
http_access deny !Safe_ports
http_access allow usuarios
http_access deny all

## Es necesario que hay al menos uno sin intercept
http_port 10.20.20.1:3128 
## TODO: ¿Funcionará al descomentar lo siguiente?
# http_port 10.20.20.1:3128 intercept
http_port 10.40.20.1:3128 intercept

cache_mem 469 MB
## TODO: Ni siquiera recuerdo el origen de estas líneas, pero su funcionamiento no tiene 
## implicaciones sobre nuestro problema
#cache_dir aufs /var/spool/squid3 500 16 256
#cache_dir aufs /var/spool/squid3-${process_number} 500 16 256 min-size=322560 

#debug_options 84,1
#debug_options 85,2
debug_options ALL,2
coredump_dir /var/spool/squid3/dump

url_rewrite_program /usr/bin/squidGuard -c /etc/squidguard/squidGuard.conf -d
url_rewrite_children 5 startup=0 idle=1 concurrency=3
url_rewrite_host_header off 

refresh_pattern .       0   20% 4320

relaxed_header_parser warn
connect_timeout 20 seconds
shutdown_lifetime 3 seconds
cache_mgr fws@salud.gob.sv
httpd_suppress_version_string on
visible_hostname firewall.dominio.com

error_default_language  es-sv
prefer_direct on
check_hostnames on

dns_retransmit_interval 2 seconds
dns_timeout 1 minutes
dns_nameservers 10.10.20.20 10.10.20.21
dns_v4_first on
La configuración mínima necesaria en squidGuard, y estoy hablando que esto es apenas un ejemplo para nada funcional de como va a trabajar realmente:
#
# CONFIG FILE FOR SQUIDGUARD
#
# Caution: do NOT use comments inside { }
#

dbhome /var/lib/squidguard/db
logdir /var/log/squidguard

#
# TIME RULES:
# abbrev for weekdays: 
# s = sun, m = mon, t =tue, w = wed, h = thu, f = fri, a = sat

time laboral {
    weekly * 00:15 - 12:29
    weekly * 13:15 - 23:55
}

#
# SOURCE ADDRESSES:
#

src usuarios {
 ip   10.20.20.0/24 
}

#
# DESTINATION CLASSES:
#
# [see also in file dest-snippet.txt]

dest deportes {
    domainlist BL/recreation/sports/domains
    log deportes.log
}

dest webtv {
    domainlist BL/webtv/domains
    log ocio.log
}

#dest adult {
# domainlist BL/adult/domains
# urllist  BL/adult/urls
# expressionlist BL/adult/expressions
# redirect http://admin.foo.bar.de/cgi-bin/blocked.cgi?clientaddr=%a&clientname=%n&clientuser=%i&clientgroup=%s&targetgroup=%t&url=%u
#}

#
# ACL RULES:
#

acl {
 usuarios {
     pass  !in-addr !deportes !webtv any
 }

 default {
  pass  none
  redirect http://admin.foo.bar.de/cgi-bin/blocked.cgi?clientaddr=%a&clientname=%n&clientuser=%i&clientgroup=%s&targetgroup=%t&url=%u
 }
}
Esto funcionaba en Debian Squeezy/Wheezy perfectamente. Tiene la ventaja de ser presumiblemente rápido a la hora de filtrar tráfico, la configuración se realiza por medio de ACL que son bastante fáciles de entender y tiene altas posibilidades con las listas En Debian Jessie hay un problema con las versiones presentes en los respositorios, precisamente en la forma en que squid3 le comunica los datos de la petición HTTP al redirector SquidGuard. Cuando se hace una prueba a redireccionar desde squidGuard
$ echo "http://anontv.com 10.20.20.11 - - GET" | squidGuard -c /etc/squidguard/squidGuard.conf 
OK rewrite-url="http://admin.foo.bar.de/cgi-bin/blocked.cgi?clientaddr=10.20.20.11&clientname=&clientuser=&clientgroup=usuarios&targetgroup=webtv&url=http://anontv.com"
Mientras que /var/log/squid3/cache.log es posible ver
2016/06/14 19:22:20.711 kid1| client_side_reply.cc(1969) processReplyAccessResult: The reply for GET http://admin.foo.bar.de/cgi-bin/blocked.cgi?clientaddr=http:&clientname=/www.anontv.com/&clientuser=10.20.20.1/firewall.salud.gob.sv&clientgroup=default&targetgroup=none&url=0 is ALLOWED, because it matched 'usuarios'
cuando se hace una petición a squid3 desde un equipo cliente. Usando un navegador, telnet o algo como wget. Por lo pronto no encuentro la solución a este problema, ahora mismo estoy bajando el DVD de Debian Testing, quizá en Strech los paquetes disponibles ya no tengan ese problema. Puede verse como el clientaddr que devuelve SquidGuard es diferente en cada caso. Así que debemos suponer que ese es el valor que squidGuard toma como IP a la hora de relacionarlo con las ACL.

Fuentes:

sábado, 11 de junio de 2016

Colander: O de como validar un JSON en Pyramid

Confieso que he dejado alguna aplicación sin que, una vez validados los datos en el cliente, los valide en el servidor.
Pero esta vez quiero hacer las cosas bien, así que encontré de suerte a Colander (Con una documentación bastante útil hasta ahora) que es capaz de realizar algo como validar los datos JSON una vez llegan a nuestra aplicación. Y se integra bien con Pyramid. Y no hay ningún problema con los test, excepto escribirlos.
Añadimos colander como dependencia de nuestro proyecto en requires de ./ambiente/aplicacion/setup.py
(...)

requires = [
    'pyramid',
    'pyramid_debugtoolbar',
    'waitress',
    'nose',
    'webtest',
    'coverage',
    'colander'
    ]

(...)
Lo instalamos
# Este sí, desde el directorio ./ambiente/aplicacion es la mejor idea
python setup.py develop
Creamos un fichero ./ambiente/aplicacion/aplicacion/schemas/usuario.py el que creamos una clase que, descendiendo de algún tipo específico de Colander, sea capaz de definir la estructura de nuestro objeto JSON, aunque en realidad podemos hacer validaciones de cadenas simples y todo bien.
mkdir ./ambiente/aplicacion/aplicacion/schemas
touch ./ambiente/aplicacion/aplicacion/schemas/__init__.py
Y creamos el fichero ./ambiente/aplicacion/aplicacion/schemas/usuario.py con el siguiente contenido:
# coding: utf-8
import colander

class Palabra(colander.SequenceSchema):
    # Palabra es un string utf-8 de al menos dos caracteres 
    palabra = colander.SchemaNode(colander.String('utf-8'), validator=colander.Length(min=2))

class UsuarioEsquema(colander.MappingSchema):
    # nombre es un string utf-8
    nombre = colander.SchemaNode(colander.String('utf-8'))
    # apellido es un string utf-8
    apellido = colander.SchemaNode(colander.String('utf-8'))
    # palabras es una lista de palabras
    palabras = Palabra(validator=colander.Length(min=1))
if __name__ == '__main__':
    esquema = Data()
    data = {'nombre': 'Alexander', 'apellido': 'Ortíz', 'palabras': ['usuario', 'formidable']}
    data = {'nombre': 389, 'palabras': ['usuario', 'formidable']}
Y en lo que ya parece una costumbre, actualizamos nuestra vista ./ambiente/aplicacion/aplicacion/views/actividades.py
# coding: utf-8
from pyramid.view import view_config
from pyramid.httpexceptions import HTTPBadRequest
from colander import Invalid

from ..fichero_app.ficheros import Usuarios
from ..schemas.usuario import UsuarioEsquema

# Empieza el trabajo con la autenticación
from pyramid.security import Allow, Deny, Everyone, NO_PERMISSION_REQUIRED, Authenticated

# Dejamos por acá un esquema listo para usarse
esquema = UsuarioEsquema()

@view_config(route_name='ficheros_listado', renderer='json', permission='listar')
def ficheros_listado(request):
    """Cuando request_method se configura acà, el mensaje es diferente porque 
    la operación alcanzada es diferente
        The resource could not be found.


     predicate mismatch for view get_listar_ficheros (request_method = GET,HEAD)
    Por tanto lo mejor es configurarlo allá en __init__
    """
    ficheros = Usuarios()
    listado = ficheros.listado()
    return {'respuesta': listado}

@view_config(route_name='ficheros_detalle', renderer='json', permission='detallar')
def ficheros_detalle(request):
    usuario = request.matchdict['usuario']
    ficheros = Usuarios()
    detalle = ficheros.detalle(usuario)
    return {'respuesta': detalle}

@view_config(route_name='ficheros_creacion', renderer='json', permission='creacion')
def ficheros_creacion(request):
    try:
        usuario = request.json_body['usuario']
        data = esquema.deserialize(request.json_body['data'])
    except Invalid as e:
        return HTTPBadRequest(json_body=e.asdict())
    except Exception as e:
        return HTTPBadRequest()
    ficheros = Usuarios()
    creacion = ficheros.creacion(usuario, data)
    return {'respuesta': creacion}

@view_config(route_name='ficheros_modificacion', renderer='json', permission='modificacion')
def ficheros_modificacion(request):
    usuario = request.matchdict['usuario']
    try:
        data = esquema.deserialize(request.json_body['data'])
    except Invalid as e:
        return HTTPBadRequest(json_body=e.asdict())
    except Exception as e:
        return HTTPBadRequest()
    ficheros = Usuarios()
    modificacion = ficheros.modificacion(usuario, data)
    return {'respuesta': modificacion}

@view_config(route_name='ficheros_borrado', renderer='json', permission='borrado')
def ficheros_borrado(request):
    usuario = request.matchdict['usuario']
    ficheros = Usuarios()
    borrado = ficheros.borrado(usuario)

    return {'respuesta': borrado}
Y para mantener las buenas costumbres, agregamos a Creacion un método test_ficheros_creacion_malformed en ./ambiente/aplicacion/aplicacion/tests/testFuncionales.py de la siguiente forma:
    def test_ficheros_creacion_malformed(self):
        datos = {'usuario': 'fcornejo',  'data': {'nombre': 'Flor', 'apellido':'Cornejo', 'palabras': ['ente', 'obvio']}}
        # Dañamos el nombre, así nos aseguramos que sea ese contenido el equivocado 
        datos['data']['nombre'] = 389
        respuesta = self.testapp.post_json('/ficheros', status=400, params=datos)
        self.assertRegexpMatches(respuesta.json_body['nombre'], '389 is not a string')

jueves, 9 de junio de 2016

Test funcionales en Pyramid para vistas que requieren autenticación

Los test funcionales no están funcionando en este momento.
======================================================================
ERROR: test_ficheros_borrado (aplicacion.tests.testFuncionales.Borrado)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/var/www/ambiente/aplicacion/aplicacion/tests/testFuncionales.py", line 78, in setUp
    self.testapp.post_json('/ficheros', status=200, params=datos)
  File "/var/www/ambiente/local/lib/python2.7/site-packages/WebTest-2.0.21-py2.7.egg/webtest/utils.py", line 36, in wrapper
    return self._gen_request(method, url, **kw)
  File "/var/www/ambiente/local/lib/python2.7/site-packages/WebTest-2.0.21-py2.7.egg/webtest/app.py", line 740, in _gen_request
    expect_errors=expect_errors)
  File "/var/www/ambiente/local/lib/python2.7/site-packages/WebTest-2.0.21-py2.7.egg/webtest/app.py", line 636, in do_request
    self._check_status(status, res)
  File "/var/www/ambiente/local/lib/python2.7/site-packages/WebTest-2.0.21-py2.7.egg/webtest/app.py", line 671, in _check_status
    "Bad response: %s (not %s)", res_status, status)
AppError: Bad response: 403 Forbidden (not 200)

(...)
Como mencioné, estos ya suceden en un nivel bastante arriba de la aplicación, casi que son las peticiones tan normales como las que hacemos con el navegador.

Los test unitarios siguen testando partes específicas del código sin importarle en realidad la actitud HTTP, incluso aunque en unos de ellos usemos un pyramid.request.Request para la petición. Que es real, pero no tan real como se esperaría, y vemos que eso es un poco bueno.

Por otra parte, webtest.TestApp es tan real que de hecho acepta cookies. Ahora veremos lo conveniente de automatizar los test: Nos seguimos ahorrando hacer un intento de formulario con una simple petición post
self.testapp.post('/login', status=302, params={'username':'vtacius'})
Y algo tan simple en el setUp de cada clase nos permitirá autenticarnos en el sistema y realizar todas las operaciones que queramos comprobar
Así que autenticamos los test de la siguiente forma en el fichero correspondiente ./ambiente/aplicacion/aplicacion/tests/testFuncionales.py:
#coding:utf-8

from unittest import TestCase

class Listado(TestCase):
    def setUp(self):
        from aplicacion import main
        from webtest import TestApp
        
        app = main({})
        self.testapp = TestApp(app)
    
    def test_ficheros_listado(self):
        respuesta = self.testapp.get('/ficheros', status=200, xhr=True)
        self.assertEqual(respuesta.content_type, 'application/json')
        self.assertItemsEqual(respuesta.json_body['respuesta'], ['alortiz', 'kpenate', 'opineda']) 

class Detalle(TestCase):
    def setUp(self):
        from aplicacion import main
        from webtest import TestApp

        app = main({})
        self.testapp = TestApp(app)

    def test_unauth_detalle(self):
        respuesta = self.testapp.get('/ficheros/' + 'alortiz', status=403, xhr=True)
        self.assertRegexpMatches(respuesta.body, 'Access was denied to this resource')

    def test_ficheros_detalle(self):
        # Habrá que loguear en cada test si no se hace en setUp()
        self.testapp.post('/login', status=302, params={'username':'vtacius'})
        respuesta = self.testapp.get('/ficheros/' + 'alortiz', status=200, xhr=True)
        self.assertItemsEqual(respuesta.json_body['respuesta']['palabras'], ['ambiente', 'publico'])

    def test_ficheros_detalle_inexistente(self):
        # Habrá que loguear en cada test si no se hace en setUp()
        self.testapp.post('/login', status=302, params={'username':'vtacius'})
        respuesta = self.testapp.get('/ficheros/' + 'fitzcarraldo', status=200, xhr=True)
        self.assertEqual(respuesta.json_body['respuesta']['error'], 'No such file or directory')

class Creacion(TestCase):
    def setUp(self):
        from aplicacion import main
        from webtest import TestApp

        app = main({})
        self.testapp = TestApp(app)
        # Loguemos en la aplicación con datos reales
        self.testapp.post('/login', status=302, params={'username':'vtacius'})

    def tearDown(self):
        respuesta = self.testapp.delete('/ficheros/' + 'fcornejo', status=200)

    def test_ficheros_creacion(self):
        datos = {'usuario': 'fcornejo', 'data': {'nombre': 'Flor', 'apellido':'Cornejo', 'palabras': ['ente', 'obvio']}}
        respuesta = self.testapp.post_json('/ficheros', status=200, params=datos)
        self.assertDictEqual(respuesta.json_body['respuesta'], datos['data'])

class Modificacion(TestCase):
    def setUp(self):
        from aplicacion import main
        from webtest import TestApp

        app = main({})
        self.testapp = TestApp(app)
        # Loguemos en la aplicación con datos reales
        self.testapp.post('/login', status=302, params={'username':'vtacius'})

        datos = {'usuario': 'fcornejo', 'data': {'nombre': 'Flor', 'apellido':'Cornejo', 'palabras': ['ente', 'obvio']}}
        self.testapp.post_json('/ficheros', status=200, params=datos)
    
    def tearDown(self):
        respuesta = self.testapp.delete('/ficheros/' + 'fcornejo', status=200)
 
    def test_ficheros_modificacion(self):
        datos = {'usuario': 'fcornejo', 'data': {'nombre': 'Flor', 'apellido':'Cornejo', 'palabras': ['espejismo', 'olvido']}}
        respuesta = self.testapp.put_json('/ficheros/' + 'fcornejo', status=200, params=datos)
        self.assertDictEqual(respuesta.json_body['respuesta'], datos['data'])
       
class Borrado(TestCase):
    def setUp(self):
        from aplicacion import main
        from webtest import TestApp

        app = main({})
        self.testapp = TestApp(app)
        # Loguemos en la aplicación con datos reales
        self.testapp.post('/login', status=302, params={'username':'vtacius'})
        
        datos = {'usuario': 'fcornejo', 'data': {'nombre': 'Flor', 'apellido':'Cornejo', 'palabras': ['ente', 'obvio']}}
        self.testapp.post_json('/ficheros', status=200, params=datos)

    def test_ficheros_borrado(self):
        self.testapp.post('/login', status=302, params={'username':'vtacius'})
        
        respuesta = self.testapp.delete('/ficheros/' + 'fcornejo', status=200)

        self.assertEqual(respuesta.json_body['respuesta'], 'fcornejo')

Con un poco de verbosidad, la salida se verá ahora de la siguiente manera:
nosetests -v
test_ficheros_borrado (aplicacion.tests.testFuncionales.Borrado) ... ok
test_ficheros_creacion (aplicacion.tests.testFuncionales.Creacion) ... ok
test_ficheros_detalle (aplicacion.tests.testFuncionales.Detalle) ... ok
test_ficheros_detalle_inexistente (aplicacion.tests.testFuncionales.Detalle) ... ok
test_unauth_detalle (aplicacion.tests.testFuncionales.Detalle) ... ok
test_ficheros_listado (aplicacion.tests.testFuncionales.Listado) ... ok
test_ficheros_modificacion (aplicacion.tests.testFuncionales.Modificacion) ... ok
test_ficheros_borrado (aplicacion.tests.testUnitarios.Borrado) ... ok
test_ficheros_borrado_inexistente (aplicacion.tests.testUnitarios.Borrado) ... ok
test_ficheros_creacion (aplicacion.tests.testUnitarios.Creacion) ... ok
test_ficheros_creacion_json_malformed (aplicacion.tests.testUnitarios.Creacion) ... ok
test_ficheros_detalle (aplicacion.tests.testUnitarios.Detalle) ... ok
test_ficheros_detalle_noexistente (aplicacion.tests.testUnitarios.Detalle) ... ok
test_ficheros_listado (aplicacion.tests.testUnitarios.Listado) ... ok
test_ficheros_modificacion (aplicacion.tests.testUnitarios.Modificacion) ... ok

Name                                  Stmts   Miss  Cover   Missing
-------------------------------------------------------------------
aplicacion.py                            19      0   100%   
aplicacion/fichero_app.py                 0      0   100%   
aplicacion/fichero_app/ficheros.py       46     11    76%   21-22, 36, 52-55, 69-72
aplicacion/resources.py                   5      0   100%   
aplicacion/security.py                    5      0   100%   
aplicacion/tests.py                       0      0   100%   
aplicacion/tests/testFuncionales.py      69      0   100%   
aplicacion/tests/testUnitarios.py        99      0   100%   
aplicacion/views.py                       0      0   100%   
aplicacion/views/actividades.py          36      2    94%   48-49
aplicacion/views/autenticacion.py         7      0   100%   
-------------------------------------------------------------------
TOTAL                                   286     13    95%   
----------------------------------------------------------------------
Ran 15 tests in 1.546s

OK

martes, 7 de junio de 2016

Un vistazo a la autenticación y autorización con Pyramid

Acostumbrado a que las Universidades nos hagan ver ambas cosas como una misma, siempre es un poco complicado entrar a este tema en cualquier framework, aunque después que has abierto los ojos todos es coser y cantar. Según la documentación de Pyramid al respecto, la forma más sencilla de empezar a configurar la seguridad en esta wea es la siguiente: Creamos el fichero ./ambiente/aplicacion/aplicacion/security.py con el siguiente contenido:
USERS = {'vtacius':'editor',
          'viewer':'viewer'}
GROUPS = {'vtacius':['group:editors','group:admins']}

def groupfinder(userid, request):
    if userid in USERS:
        return GROUPS.get(userid, [])
Lo que necesitamos en realidad es a groupfinder. Los diccionarios USERS y GROUPS son, por decirlo de una forma, nuestra forma de simular nuestra base de datos. De hecho, esta función no es del todo obligatoria. Sólo la usamos si queremos agregar un principals a nuestro usuario logueado. Sea cual sea la forma que usemos para esta función (Seguramente habrá una consulta a una base de datos), lo importante es que debe devolver una lista de principals (Que a esta altura se antoja entenderlos como "roles") en la forma mostrada:
['groups:editors','groups:publishers']

Luego creamos el ficheros ./ambiente/aplicacion/aplicacion/resources.py con el siguiente contenido:
from pyramid.security import Allow, Everyone, Authenticated

class Root(object):

        __acl__ = [
                (Allow, Everyone, 'listar'),
                (Allow, Authenticated, 'detallar'),
                (Allow, 'groups:admins', 'creacion'),
                (Allow, 'groups:editors', 'modificacion'),
                (Allow, 'groups:admins', 'borrado')
                ]

        def __init__(self, request):
                pass

Que es básicamente la ACL de la aplicación. La ACL en cuestión de compone de una lista de tuplas (Llamadas ACE). que tienen la siguiente forma:
({{Acción a tomar}}, {{Principals necesario}}, {{Nombre de ACE}})
Para unir todo esto, necesitamos modificar el ./ambiente/aplicacion/aplicacion/__init__.py de nuestra aplicación para configurar la autenticación y autorización:
# coding: utf-8
from pyramid.config import Configurator

# Empieza el trabajo con la autenticación de la wea esta
from pyramid.authentication import AuthTktAuthenticationPolicy
from pyramid.authorization import ACLAuthorizationPolicy
from pyramid.security import Authenticated

from .security import groupfinder

def main(global_config, **settings):
    """ This function returns a Pyramid WSGI application.
    """
    # Empieza el trabajo con la autenticacion: Creo las politicas, así nomás y sin gracia
    # Por cierto, 'c3cre3t0 debería cambiarse a algo más personal
    # como callback, aparece nuestro amigo groupfinder, pero esto podría obviarse
    authn_policy = AuthTktAuthenticationPolicy('c3cr3t0', hashalg='sha512', callback=groupfinder)
    authz_policy = ACLAuthorizationPolicy()

    # Y por acá agregamos a resources.py
    config = Configurator(settings=settings, root_factory='.resources.Root')

    # Empieza el trabajo con la autenticacion: Configuro las politicas en la aplicación
    config.set_authentication_policy(authn_policy)
    config.set_authorization_policy(authz_policy)

    # Donde sucede la magia del login
    config.add_route(name='login', pattern='/login', request_method='POST')

    # Cuando request_method se encuentra acá, es devuelto un error como 
    # "The resource could not be found." 
    # Así que es como mejor configurar desde acá a request_method
    config.add_route('ficheros_listado', '/ficheros', request_method='GET')
    config.add_route('ficheros_detalle', '/ficheros/{usuario}', request_method='GET')
    config.add_route('ficheros_creacion', '/ficheros', request_method='POST')
    config.add_route('ficheros_modificacion', '/ficheros/{usuario}', request_method='PUT')
    config.add_route('ficheros_borrado', '/ficheros/{usuario}', request_method='DELETE')
    config.scan()
    return config.make_wsgi_app()
Y añadimos los permisos en nuestras vistas. Pues parece que sólo funcionan de esa manera. Así que el contenido de ./ambiente/aplicacion/aplicacion/views/actividades.py queda de la siguiente forma
# coding: utf-8
from pyramid.view import view_config
from ..fichero_app.ficheros import Usuarios
from pyramid import httpexceptions as exception

# Podrías usar los siguientes en lugar de los permisos personalizados que tienes en este momento, pero no
# La importación de hecho no es necesaria para nada
from pyramid.security import Everyone, Authenticated

@view_config(route_name='ficheros_listado', renderer='json', permission='listar')
def ficheros_listado(request):
    """Cuando request_method se configura acà, el mensaje es diferente porque 
    la operación alcanzada es diferente
        The resource could not be found.


     predicate mismatch for view get_listar_ficheros (request_method = GET,HEAD)
    Por tanto lo mejor es configurarlo allá en __init__
    """
    ficheros = Usuarios()
    listado = ficheros.listado()
    return {'respuesta': listado}

@view_config(route_name='ficheros_detalle', renderer='json', permission='detallar')
def ficheros_detalle(request):
    usuario = request.matchdict['usuario']
    ficheros = Usuarios()
    detalle = ficheros.detalle(usuario)
    return {'respuesta': detalle}

@view_config(route_name='ficheros_creacion', renderer='json', permission='creacion')
def ficheros_creacion(request):
    try:
        usuario = request.json_body['usuario']
        data = request.json_body['data']
    except Exception as e:
        return exception.HTTPBadRequest()
    ficheros = Usuarios()
    creacion = ficheros.creacion(usuario, data)
    return {'respuesta': creacion}

@view_config(route_name='ficheros_modificacion', renderer='json', permission='modificacion')
def ficheros_modificacion(request):
    usuario = request.matchdict['usuario']
    try:
        data = request.json_body['data']
    except Exception as e:
        return exception.HTTPBadRequest()
    ficheros = Usuarios()
    modificacion = ficheros.modificacion(usuario, data)
    return {'respuesta': modificacion}

@view_config(route_name='ficheros_borrado', renderer='json', permission='borrado')
def ficheros_borrado(request):
    usuario = request.matchdict['usuario']
    ficheros = Usuarios()
    borrado = ficheros.borrado(usuario)

    return {'respuesta': borrado}

Y solo faltaría agregar la pequeña vista que se encarga del login ./ambiente/aplicacion/aplicacion/views/login.py:
# coding: utf-8
from pyramid.view import view_config
from pyramid.security import remember
from pyramid.httpexceptions import HTTPFound

@view_config(route_name='login', request_method='POST')
def login(request):
        usuario = request.POST.get('username')
        cabeceras = remember(request, usuario)
        return HTTPFound(headers=cabeceras)
        # De la siguiente forma, no hay HTML devuelto, pero todo funciona bien
        #response = request.response
        #response.headerlist.extend(cabeceras)
        #return response
Así procedemos a probar nuestra aplicación desde consola:
$ curl -w '\n' -X GET http://localhost:6543/ficheros 
{"respuesta": ["alortiz", "kpenate", "opineda"]}
Atentos a este. -i agregará las cabeceras que recibimos de respuesta para que quede totalmente claro lo que pasa:
$ curl -i -w '\n' -X GET http://localhost:6543/ficheros/alortiz
HTTP/1.1 403 Forbidden
Content-Length: 1085
Content-Type: text/html; charset=UTF-8
Date: Tue, 07 Jun 2016 02:36:22 GMT
Server: waitress

<html>
 <head>
  <title>403 Forbidden</title>
 </head>
 <body>
  <h1>403 Forbidden</h1>
  Access was denied to this resource.<br/><br/>
debug_authorization of url http://localhost:6543/ficheros/alortiz (view name u'' against context &lt;aplicacion.resources.Root object at 0x7f501c490e90&gt;): ACLDenied permission 'detallar' via ACE '&lt;default deny&gt;' in ACL [('Allow', 'system.Everyone', 'listar'), ('Allow', 'system.Authenticated', 'detallar'), ('Allow', 'groups:admins', 'creacion'), ('Allow', 'groups:editors', 'modificar'), ('Allow', 'groups:admins', 'eliminar')] on context &lt;aplicacion.resources.Root object at 0x7f501c490e90&gt; for principals ['system.Everyone']


 <link rel="stylesheet" type="text/css" href="http://localhost:6543/_debug_toolbar/static/toolbar/toolbar_button.css">

<div id="pDebug">
    <div  id="pDebugToolbarHandle">
        <a title="Show Toolbar" id="pShowToolBarButton"
           href="http://localhost:6543/_debug_toolbar/313339393832303438363539353336" target="pDebugToolbar">&#171; FIXME: Debug Toolbar</a>
    </div>
</div>
</body>
</html>
HTTP/1.1 403 Forbidden. Que ha funcionado. Así que ahora probamos la autenticación mediante curl desde la siguiente forma:
$ curl -i -w "\n" -X POST http://localhost:6543/login -d "username=vtacius" 
HTTP/1.1 302 Found
Content-Length: 556
Content-Type: text/html; charset=UTF-8
Date: Tue, 07 Jun 2016 02:43:25 GMT
Location: http://localhost:6543/login
Server: waitress
Set-Cookie: auth_tkt=95c36928020725f40edd54266285117cfddd4d05b0cc42f969b9fee13a3ff52f4e5f98491c426f6c6bc5242d1ace926e264865a8cc1a30f127d1b1a6ec3de457575634cddnRhY2l1cw%3D%3D!userid_type:b64unicode; Path=/
Set-Cookie: auth_tkt=95c36928020725f40edd54266285117cfddd4d05b0cc42f969b9fee13a3ff52f4e5f98491c426f6c6bc5242d1ace926e264865a8cc1a30f127d1b1a6ec3de457575634cddnRhY2l1cw%3D%3D!userid_type:b64unicode; Domain=localhost; Path=/
Set-Cookie: auth_tkt=95c36928020725f40edd54266285117cfddd4d05b0cc42f969b9fee13a3ff52f4e5f98491c426f6c6bc5242d1ace926e264865a8cc1a30f127d1b1a6ec3de457575634cddnRhY2l1cw%3D%3D!userid_type:b64unicode; Domain=.localhost; Path=/

<html>
 <head>
  <title>302 Found</title>
 </head>
 <body>
  <h1>302 Found</h1>
  The resource was found at ; you should be redirected automatically.


 <link rel="stylesheet" type="text/css" href="http://localhost:6543/_debug_toolbar/static/toolbar/toolbar_button.css">

<div id="pDebug">
    <div  id="pDebugToolbarHandle">
        <a title="Show Toolbar" id="pShowToolBarButton"
           href="http://localhost:6543/_debug_toolbar/313339393832303434383337353834" target="pDebugToolbar">&#171; FIXME: Debug Toolbar</a>
    </div>
</div>
</body>
</html>
(Para correr este última petición podría ser necesario reiniciar nuestro servidor web de prueba)
Y otra vez nos hemos ahorrado escribir un formulario a la carrera con la opción -d de curl, con la que enviamos los datos que la aplicación requiere.

Lo que necesitamos de ahora en adelante es usar la opción -H para configurar a curl que use la cookie auth_tkt que nos envío como respuesta la aplicación en cada petición que necesite autenticación. Cuidado con usar comillas dobles para limitar el contenido de la cookie, desde consola existe el inconveniente de:
$ curl -w '\n' -X GET http://localhost:6543/ficheros/alortiz -H 'Cookie: auth_tkt=95c36928020725f40edd54266285117cfddd4d05b0cc42f969b9fee13a3ff52f4e5f98491c426f6c6bc5242d1ace926e264865a8cc1a30f127d1b1a6ec3de457575634cddnRhY2l1cw%3D%3D!userid_type:b64unicode'
{"respuesta": {"palabras": ["ambiente", "publico"], "nombre": "Alexander", "apellido": "Ort\u00edz"}}

Fuentes:
How to create high scalable web-backends for ios developers 0.0.0 documentation

domingo, 5 de junio de 2016

Automatizando las pruebas para Pyramid con Test Funcionales

Las pruebas funcionales son otra onda. La cuestión es que se encuentran un poco más arriba de la aplicación, casi que casi es como hacer las pruebas de toda la vida en el navegador,  así que hay poco control de la aplicación en sí, lo que de hecho no es tan malo después de todo, ya que tampoco el usuario final tendrá tanto control de nuestra aplicación.

Para nuestro proyecto, bastará con que crear el fichero ./ambiente/aplicacion/aplicacion/tests/testFuncionales.py con el siguiente contenido.
#coding:utf-8

from unittest import TestCase

class Listado(TestCase):
    def setUp(self):
        from aplicacion import main
        from webtest import TestApp
        
        app = main({})
        self.testapp = TestApp(app)
    
    def test_ficheros_listado(self):
        respuesta = self.testapp.get('/ficheros', status=200, xhr=True)
        self.assertEqual(respuesta.content_type, 'application/json')
        self.assertItemsEqual(respuesta.json_body['respuesta'], ['alortiz', 'kpenate', 'opineda']) 

class Detalle(TestCase):
    def setUp(self):
        from aplicacion import main
        from webtest import TestApp

        app = main({})
        self.testapp = TestApp(app)

    def test_ficheros_detalle(self):
        respuesta = self.testapp.get('/ficheros/' + 'alortiz', status=200, xhr=True)
        self.assertItemsEqual(respuesta.json_body['respuesta']['palabras'], ['ambiente', 'publico'])

    def test_ficheros_detalle_inexistente(self):
        respuesta = self.testapp.get('/ficheros/' + 'fitzcarraldo', status=200, xhr=True)
        self.assertEqual(respuesta.json_body['respuesta']['error'], 'No such file or directory')

class Creacion(TestCase):
    def setUp(self):
        from aplicacion import main
        from webtest import TestApp

        app = main({})
        self.testapp = TestApp(app)

    def tearDown(self):
        respuesta = self.testapp.delete('/ficheros/' + 'fcornejo', status=200)

    def test_ficheros_creacion(self):
        datos = {'usuario': 'fcornejo', 'data': {'nombre': 'Flor', 'apellido':'Cornejo', 'palabras': ['ente', 'obvio']}}
        respuesta = self.testapp.post_json('/ficheros', status=200, params=datos)
        self.assertDictEqual(respuesta.json_body['respuesta'], datos['data'])

class Modificacion(TestCase):
    def setUp(self):
        from aplicacion import main
        from webtest import TestApp

        app = main({})
        self.testapp = TestApp(app)

        datos = {'usuario': 'fcornejo', 'data': {'nombre': 'Flor', 'apellido':'Cornejo', 'palabras': ['ente', 'obvio']}}
        self.testapp.post_json('/ficheros', status=200, params=datos)
    
    def tearDown(self):
        respuesta = self.testapp.delete('/ficheros/' + 'fcornejo', status=200)
 
    def test_ficheros_modificacion(self):
        datos = {'usuario': 'fcornejo', 'data': {'nombre': 'Flor', 'apellido':'Cornejo', 'palabras': ['espejismo', 'olvido']}}
        respuesta = self.testapp.put_json('/ficheros/' + 'fcornejo', status=200, params=datos)
        self.assertDictEqual(respuesta.json_body['respuesta'], datos['data'])
       
class Borrado(TestCase):
    def setUp(self):
        from aplicacion import main
        from webtest import TestApp

        app = main({})
        self.testapp = TestApp(app)
        
        datos = {'usuario': 'fcornejo', 'data': {'nombre': 'Flor', 'apellido':'Cornejo', 'palabras': ['ente', 'obvio']}}
        self.testapp.post_json('/ficheros', status=200, params=datos)

    def test_ficheros_borrado(self):
        respuesta = self.testapp.delete('/ficheros/' + 'fcornejo', status=200)
        self.assertEqual(respuesta.json_body['respuesta'], 'fcornejo')
        
Al correr nosetest con un poco de verbosidad (Un -v más no añade nada importante, y el próximo es casi preocupante), la salida debería ser la siguiente:
$ nosetests -v
test_ficheros_borrado (aplicacion.tests.testFuncionales.Borrado) ... ok
test_ficheros_creacion (aplicacion.tests.testFuncionales.Creacion) ... ok
test_ficheros_detalle (aplicacion.tests.testFuncionales.Detalle) ... ok
test_ficheros_detalle_inexistente (aplicacion.tests.testFuncionales.Detalle) ... ok
test_ficheros_listado (aplicacion.tests.testFuncionales.Listado) ... ok
test_ficheros_modificacion (aplicacion.tests.testFuncionales.Modificacion) ... ok
test_ficheros_borrado (aplicacion.tests.testUnitarios.Borrado) ... ok
test_ficheros_borrado_inexistente (aplicacion.tests.testUnitarios.Borrado) ... ok
test_ficheros_creacion (aplicacion.tests.testUnitarios.Creacion) ... ok
test_ficheros_creacion_json_malformed (aplicacion.tests.testUnitarios.Creacion) ... ok
test_ficheros_detalle (aplicacion.tests.testUnitarios.Detalle) ... ok
test_ficheros_detalle_noexistente (aplicacion.tests.testUnitarios.Detalle) ... ok
test_ficheros_listado (aplicacion.tests.testUnitarios.Listado) ... ok
test_ficheros_modificacion (aplicacion.tests.testUnitarios.Modificacion) ... ok

Name                                  Stmts   Miss  Cover   Missing
-------------------------------------------------------------------
aplicacion.py                            10      0   100%   
aplicacion/fichero_app.py                 0      0   100%   
aplicacion/fichero_app/ficheros.py       46     11    76%   21-22, 36, 52-55, 69-72
aplicacion/tests.py                       0      0   100%   
aplicacion/tests/testFuncionales.py      60      0   100%   
aplicacion/tests/testUnitarios.py        99      0   100%   
aplicacion/views.py                       0      0   100%   
aplicacion/views/actividades.py          35      2    94%   43-44
-------------------------------------------------------------------
TOTAL                                   250     13    95%   
----------------------------------------------------------------------
Ran 14 tests in 1.231s

OK

        

viernes, 3 de junio de 2016

Automatizando las pruebas para Pyramid con Test Unitarios

Resulta que el test unitario es básicamente el doble del código que se pretende testar, pero así es la vida. El punto es que he supuesto algunas buenas prácticas que leía en alguna parte (O varias partes) y vergonzosamente he perdido los enlaces así que los incluyo luego.}
  • Cada test hace una sola cosa: Así que va un assert por cada método test_* 
  • Cada test es una unidad independiente: Lo que creo que tengo que revisar porque la duplicidad de código por ahora es un horror 
  • Los test no comparten fixtures: Pero es otra cosa que falta revisar porque sí 
  • Me apoyo en el uso de setUp y tearDown para inicializar/reinicializar el ambiente de cada test a valores conocidos
Creamos un paquete completo para los test unitarios, lo que es necesario sobre todo para ser capaces de invocar los demás paquetes de la aplicacion:
$ mkdir ./ambiente/aplicacion/aplicacion/tests/
$ touch ./ambiente/aplicacion/aplicacion/tests/__init__.py
$ ./ambiente/aplicacion/aplicacion/tests/testUnitarios.py
Por lo demás, el fichero ./ambiente/aplicacion/aplicacion/tests/testUnitarios.py queda de la siguiente forma:
#coding: utf-8 from unittest import TestCase

from unittest import TestCase
from pyramid import testing
from json import dumps

class Listado(TestCase):
    def setUp(self):
        self.config = testing.setUp()
    
    def tearDown(self):
        self.config = testing.tearDown()

    def test_ficheros_listado(self):
        from ..views.actividades import ficheros_listado
        peticion = testing.DummyRequest()
        respuesta = ficheros_listado(peticion)
        self.assertItemsEqual(respuesta['respuesta'], ['alortiz', 'kpenate', 'opineda'])

class Detalle(TestCase):
    def setUp(self):
        self.config = testing.setUp()

    def tearDown(self):
        self.config = testing.tearDown()
    
    def test_ficheros_detalle(self):
        from ..views.actividades import ficheros_detalle
        peticion = testing.DummyRequest()
        peticion.matchdict = {'usuario': 'alortiz'}
        respuesta = ficheros_detalle(peticion)
        self.assertItemsEqual(respuesta['respuesta']['palabras'], ['ambiente', 'publico'])

    def test_ficheros_detalle_noexistente(self):
        from ..views.actividades import ficheros_detalle
        peticion = testing.DummyRequest()
        peticion.matchdict = {'usuario': 'fitzcarraldo'}
        respuesta = ficheros_detalle(peticion)
        self.assertEqual(respuesta['respuesta']['error'], 'No such file or directory')

class Creacion(TestCase):
    def setUp(self):
        self.config = testing.setUp()

    def tearDown(self):
        self.config = testing.tearDown()
        from ..views.actividades import ficheros_borrado
        peticion = testing.DummyRequest()
        peticion.matchdict = {'usuario': 'fcornejo'}
        respuesta = ficheros_borrado(peticion)
    
    def test_ficheros_creacion (self):
        from ..views.actividades import ficheros_creacion
        from pyramid.request import Request
        contenido = dumps({'usuario': 'fcornejo', 'data': {'nombre': 'Flor', 'apellido':'Cornejo', 'palabras': ['ente', 'obvio']}})
        peticion = Request.blank('', {}, body = contenido)
        respuesta = ficheros_creacion(peticion)
        self.assertDictEqual(respuesta['respuesta'], {'nombre': 'Flor', 'apellido':'Cornejo', 'palabras': ['ente', 'obvio']})

    def test_ficheros_creacion_json_malformed(self):
        from ..views.actividades import ficheros_creacion
        from pyramid.request import Request
        contenido = "Esfuerzo mínimo para representar daño máximo"
        peticion = Request.blank('', {}, body = contenido)
        respuesta = ficheros_creacion(peticion)
        self.assertEqual(respuesta.status_code, 400)

class Modificacion(TestCase):
    def setUp(self):
        self.config = testing.setUp()
        from ..views.actividades import ficheros_creacion
        from pyramid.request import Request
        contenido = dumps({'usuario': 'fcornejo', 'data': {'nombre': 'Flor', 'apellido':'Cornejo', 'palabras': ['ente', 'obvio']}})
        peticion = Request.blank('', {}, body=contenido)
        respuesta = ficheros_creacion(peticion)
        self.assertDictEqual(respuesta['respuesta'], {'nombre': 'Flor', 'apellido':'Cornejo', 'palabras': ['ente', 'obvio']})
    
    def tearDown(self):
        self.config = testing.tearDown()
        from ..views.actividades import ficheros_borrado
        peticion = testing.DummyRequest()
        peticion.matchdict = {'usuario': 'fcornejo'}
        respuesta = ficheros_borrado(peticion)

    def test_ficheros_modificacion(self):
        from ..views.actividades import ficheros_modificacion
        from pyramid.request import Request
        contenido = dumps({'usuario': 'fcornejo', 'data': {'nombre': 'Flor', 'apellido':'Cornejo', 'palabras': ['especial', 'elemental']}})
        peticion = Request.blank('', {}, body=contenido)
        peticion.matchdict = {'usuario': 'fcornejo'}
        respuesta = ficheros_modificacion(peticion)
        self.assertDictEqual(respuesta['respuesta'], {'nombre': 'Flor', 'apellido':'Cornejo', 'palabras': ['especial', 'elemental']})
    
class Borrado(TestCase):
    def setUp(self):
        self.config = testing.setUp()
        from ..views.actividades import ficheros_creacion
        from pyramid.request import Request
        contenido = dumps({'usuario': 'fcornejo', 'data': {'nombre': 'Flor', 'apellido':'Cornejo', 'palabras': ['especial', 'elemental']}})
        peticion = Request.blank('', {}, body=contenido)
        respuesta = ficheros_creacion(peticion)
        self.assertDictEqual(respuesta['respuesta'], {'nombre': 'Flor', 'apellido':'Cornejo', 'palabras': ['especial', 'elemental']}) 
    
    def tearDown(self):
        self.config = testing.tearDown()

    def test_ficheros_borrado(self):
        from ..views.actividades import ficheros_borrado
        peticion = testing.DummyRequest()
        peticion.matchdict = {'usuario': 'fcornejo'}
        respuesta = ficheros_borrado(peticion)
        self.assertEqual(respuesta['respuesta'], 'fcornejo')
    
    def test_ficheros_borrado_inexistente(self):
        from ..views.actividades import ficheros_borrado
        peticion = testing.DummyRequest()
        peticion.matchdict = {'usuario': 'noexistente'}
        respuesta = ficheros_borrado(peticion)
        self.assertEqual(respuesta['respuesta']['error'], 'No such file or directory')

Al ser unidades separadas, cada test fallará por si mismo. Mi idea al inicio era la de crear, modificar y eliminar el mismo objeto. Pero resulta que eso no es tan buena idea como parece, para empezar porque los test, dentro de cada clase (E incluso las clases dentro de los paquetes) se ordenan por nombre al ejecutarse, lo que me daría nombres bien feos a todos los test. Pero por la misma idea de que cada test debe ser tan independiente como le sea posible

miércoles, 1 de junio de 2016

Testeando ruta PUT en Pyramid

Sigo con mi aplicación que simula ser una API REST. Estoy haciendo cambios sobre la marcha y me quedé estancado en el verbo PUT. Se supone que el controlador asociado a dicho verbo debería ser de la siguiente forma:
@view_config(route_name='put_ficheros_modificacion', renderer='json')
def put_ficheros_modificacion(request):
    usuario = request.matchdict['fichero']
    data = request.POST.get('data')
    ficheros = Ficheros()
    modificacion_fichero = ficheros.modificacion(usuario, data)
    return {'ficheroModificacion': modificacion_fichero}
Y la prueba unitaria correspondiente debe usar un Request y no un DummyRequest
def test_put_ficheros_modificacion(self):
        from .views.actividades import put_ficheros_modificacion
        from pyramid.request import Request
        fixture = self.fixture['put_ficheros_modificacion']

        parametros = {'data':fixture['data']}
        peticion = Request.blank('', {}, POST=parametros)       
        peticion.matchdict = {'fichero':'fcornejo'}

        respuesta = put_ficheros_modificacion(peticion)
        self.assertEqual(respuesta['ficheroModificacion']['palabras'], fixture['data']['palabras'])

Básicamente sirve. Sin embargo, de esta forma, lo que se envía es un parámetro data que contiene por contenido al diccionario que guardo en mi fixture pero como texto. Así que tengo un error como el siguiente:
$ nosetests
....E
======================================================================
ERROR: test_put_ficheros_modificacion (aplicacion.tests.ViewTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/var/www/ambiente/aplicacion/aplicacion/tests.py", line 81, in test_put_ficheros_modificacion
    self.assertEqual(respuesta['ficheroModificacion']['palabras'], fixture['data']['palabras'])
TypeError: string indices must be integers
Precisamente porque la respuesta que estoy recibiendo en respuesta por parte de la aplicación es texto. Pues que esto se solucionaría cuidando un poco más los parámetros POST, básicamente creo que soportaría con el diccionario. Pero, eso haría que tuviera que hacer un request.POST.get() por cada clave. Que sería una buena idea para validar datos en el controlador, aunque supongo que habrá después otra forma

Total que mi gran solución es que el cliente envíe todo como json. Pues sí, creo que es la mejor idea posible. El controlador asociado queda de la siguiente forma:
@view_config(route_name='put_ficheros_modificacion', renderer='json')
def put_ficheros_modificacion(request):
    usuario = request.matchdict['fichero']
    data = request.json_body['data']
    ficheros = Ficheros()
    modificacion_fichero = ficheros.modificacion(usuario, data)
    return {'ficheroModificacion': modificacion_fichero}
Tampoco el test unitario ha cambiado tanto, excepto por configurar body en lugar de POST
    def test_put_ficheros_modificacion(self):
        from .views.actividades import put_ficheros_modificacion
        from pyramid.request import Request
        import json
        fixture = self.fixture['put_ficheros_modificacion']


        parametros = json.dumps({'data':fixture['data']})
        peticion = Request.blank('', {}, body=parametros)  
        peticion.matchdict = {'fichero':fixture['usuario']}

        respuesta = put_ficheros_modificacion(peticion)
        self.assertEqual(respuesta['ficheroModificacion']['palabras'], fixture['data']['palabras'])
Así que ahora el cliente debe enviar datos JSON. Lo que es mantiene todo simple y divertido a la hora de usar curl Supongo que eso abre la necesidad de validar el formato y cosas por el estilo, pero eso vendrá después. Y

Fuentes:
pyramid.request
Request and Response Objects

lunes, 30 de mayo de 2016

Verificando problemas de conexión a servidores de correo externos

Administrar un servidor de correo tiene un poco de arte entre tanta técnica. Descubrir problemas una vez nuestro servidor dice que todo esta bien (Para aprender sobre lo que el servidor dice estar bien, Interpretación de maillog en Postfix es una lectura bastante recomendable) puede ser una tarea casi imposible que involucra otras aspectos de la comunicación entre servidores.

Por otra parte, al tratar con un servidor como Zimbra, los registros pueden ser extensos al punto de causar un poco de confusión. Sin embargo, el flujo de cualquier correo que se dirige a un servidor externo siempre debería terminar con lineas como las siguientes de parte del componente smtp de postfix:
May 30 08:55:54 mail postfix/smtp[51709]: A5E3A49215DB: to=<cuenta@gmail.com>, relay=gmail-smtp-in.l.google.com[74.125.21.27]:25, delay=2, delays=0.16/0/0.54/1.3, dsn=2.0.0, status=sent (250 2.0.0 OK 1464620154 s81si15913905ybb.199 - gsmtp)
Como puede verse, la línea en sí tiene la siguiente forma:
A5E3A49215DB: to=<cuenta@servicio.com>, relay=servidor_externo[ip_servidor_externo], delay=2, delays=0.16/0/0.54/1.3, dsn=2.0.0, status=sent (250 2.0.0 OK mensaje_opcional)
Tener en cuenta que mensaje opcional es bastante propio de cada servicio de correo, y suelen ser bastante diferentes:
May 30 08:54:19 mail postfix/smtp[51710]: C126C49A603B: to=<cuenta@yahoo.es>, relay=mx-eu.mail.am0.yahoodns.net[188.125.69.79]:25, delay=2.6, delays=0.04/0.01/1.2/1.3, dsn=2.0.0, status=sent (250 ok dirdel)
May 30 08:53:22 mail postfix/smtp[12873]: 692BF4983168: to=<cuenta@hotmail.com>, relay=mx4.hotmail.com[65.55.92.168]:25, delay=5.2, delays=0.11/0.02/1.3/3.7, dsn=2.0.0, status=sent (250  <1740202306.276413.1464619995882.JavaMail.zimbra@dominio_local> Queued mail for delivery)
Pues que la costumbre podría llevarnos a conocer la respuesta específica de cada uno. Lo que cual puede ser importante ya que en cuento recibimos dicha respuesta, estamos complemente seguros de que el servidor externo ha recibido el correo. No queda de otra, que lo envíe a Spam ya es otra cuestión pero al menos sabemos no tenemos problemas de comunicación entre nuestro servidor y el de tal servidor del sistema de correo externo.

Ah, pero es posible que existan un problema si por "casualidad" tu proveedor ha configurado alguna especie de filtro de correo súper avanzado (Pero que irónicamente ya no tiene soporte). Porque en ese caso quién realiza la conexión con el servidor externo será dicho gateway y no tu servidor directamente. Lo cual te deja en la mala posición de no tener un control total sobre la forma en que se filtra tu correo. Con un filtro como este, es bastante difícil darse cuenta del problema ya que el mismo responde al componente smtp de nuestro servidor con un mensaje 250, y sin embargo no garantiza para nada que los correos pasen el filtro y mucho menos nos avisa de ello. Como si no fuera poco, este tipo de filtros de correos incumplen las normativas del protocolo porque no agregan cabeceras de su actividad en el correo, con lo que su detección se hace difícil. 

La forma más sencilla de verificar que la conexión a los servidores de correos externos es totalmente directa, es revisar la forma en que se empieza la negociación TLS con servidores en los que esto es posible (Lo que hoy en día constituyen la inmensa mayoría de servicios serios)
$ dig mx google.com.sv +short
10 google.com.s9a1.psmtp.com.
10 google.com.s9a2.psmtp.com.
10 google.com.s9b2.psmtp.com.
10 google.com.s9b1.psmtp.com.

$ openssl s_client -starttls smtp -crlf -connect google.com.s9a1.psmtp.com:25
CONNECTED(00000003)
depth=2 C = US, O = GeoTrust Inc., CN = GeoTrust Global CA
verify error:num=20:unable to get local issuer certificate
verify return:0
---
Certificate chain
 0 s:/C=US/ST=California/L=Mountain View/O=Google Inc/CN=mx.google.com
   i:/C=US/O=Google Inc/CN=Google Internet Authority G2
 1 s:/C=US/O=Google Inc/CN=Google Internet Authority G2
   i:/C=US/O=GeoTrust Inc./CN=GeoTrust Global CA
 2 s:/C=US/O=GeoTrust Inc./CN=GeoTrust Global CA
   i:/C=US/O=Equifax/OU=Equifax Secure Certificate Authority
---
Server certificate
-----BEGIN CERTIFICATE-----
MIIGoTCCBYmgAwIBAgIIbvTocO4F/oMwDQYJKoZIhvcNAQELBQAwSTELMAkGA1UE
BhMCVVMxEzARBgNVBAoTCkdvb2dsZSBJbmMxJTAjBgNVBAMTHEdvb2dsZSBJbnRl
cm5ldCBBdXRob3JpdHkgRzIwHhcNMTYwNTE4MTEwNzU4WhcNMTYwODEwMTA0NjAw
WjBnMQswCQYDVQQGEwJVUzETMBEGA1UECAwKQ2FsaWZvcm5pYTEWMBQGA1UEBwwN
TW91bnRhaW4gVmlldzETMBEGA1UECgwKR29vZ2xlIEluYzEWMBQGA1UEAwwNbXgu
Z29vZ2xlLmNvbTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAJC2mSIn
I9NGj7WxngxCWH24lvQvVgNOA6ZZdkQkpcnD7mmXw78E0irxXnIuNjyu3OEVbkyK
XJdn+nBYRCiB5a7bUcIM2Z0T9uzfvaiJoEAAWnWkBi9o5Yr6At3VCR9JnHaU8RWD
VLo0JpROn+gJqQM64lF2X0PHo8sXFAnow3ySPBcSC3wthUrT0UptBW6EWh8vS4IL
4iWbsbKjpqPCMHaCmOPBz+tFqTeCuOlt5uC0f0PtNTzXG00pUffClrlTrRudRbkb
XVPncWJYoPSVO1HlLcBLHOCaUQS4rJKfrhnqXQTlEQSu0gH+UiiX2leq4kOtzNJv
/B4bBEycTZpAbqcCAwEAAaOCA20wggNpMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggr
BgEFBQcDAjCCAjkGA1UdEQSCAjAwggIsgg1teC5nb29nbGUuY29tghdhbHQxLmFz
cG14LmwuZ29vZ2xlLmNvbYIfYWx0MS5nbWFpbC1zbXRwLWluLmwuZ29vZ2xlLmNv
bYIdYWx0MS5nbXItc210cC1pbi5sLmdvb2dsZS5jb22CF2FsdDIuYXNwbXgubC5n
b29nbGUuY29tgh9hbHQyLmdtYWlsLXNtdHAtaW4ubC5nb29nbGUuY29tgh1hbHQy
Lmdtci1zbXRwLWluLmwuZ29vZ2xlLmNvbYIXYWx0My5hc3BteC5sLmdvb2dsZS5j
b22CH2FsdDMuZ21haWwtc210cC1pbi5sLmdvb2dsZS5jb22CHWFsdDMuZ21yLXNt
dHAtaW4ubC5nb29nbGUuY29tghdhbHQ0LmFzcG14LmwuZ29vZ2xlLmNvbYIfYWx0
NC5nbWFpbC1zbXRwLWluLmwuZ29vZ2xlLmNvbYIdYWx0NC5nbXItc210cC1pbi5s
Lmdvb2dsZS5jb22CEmFzcG14LmwuZ29vZ2xlLmNvbYIVYXNwbXgyLmdvb2dsZW1h
aWwuY29tghVhc3BteDMuZ29vZ2xlbWFpbC5jb22CFWFzcG14NC5nb29nbGVtYWls
LmNvbYIVYXNwbXg1Lmdvb2dsZW1haWwuY29tghpnbWFpbC1zbXRwLWluLmwuZ29v
Z2xlLmNvbYIRZ21yLW14Lmdvb2dsZS5jb22CGGdtci1zbXRwLWluLmwuZ29vZ2xl
LmNvbTBoBggrBgEFBQcBAQRcMFowKwYIKwYBBQUHMAKGH2h0dHA6Ly9wa2kuZ29v
Z2xlLmNvbS9HSUFHMi5jcnQwKwYIKwYBBQUHMAGGH2h0dHA6Ly9jbGllbnRzMS5n
b29nbGUuY29tL29jc3AwHQYDVR0OBBYEFIX2Hm9qeZHky5XG7V5xu7fBEovoMAwG
A1UdEwEB/wQCMAAwHwYDVR0jBBgwFoAUSt0GFhu89mi1dvWBtrtiGrpagS8wIQYD
VR0gBBowGDAMBgorBgEEAdZ5AgUBMAgGBmeBDAECAjAwBgNVHR8EKTAnMCWgI6Ah
hh9odHRwOi8vcGtpLmdvb2dsZS5jb20vR0lBRzIuY3JsMA0GCSqGSIb3DQEBCwUA
A4IBAQBBlIRHixSPzCY/GS+PHpc5wDMaquNvCLKFreEnMWkH7tN7lmcyC/f8G6Kq
rtR7TdD143pSGf0nxECNFV6QA6DdAKJZ/NWu5YVK/wL/FUHedNjLfO0jOXz7DoWX
M+T0hQmEwIRPHgjGN/WjDF0YiL73wLn3Ji1fYKcE/OVmSROqc7X2F7C03DxbikW5
fBjXOpRUc+pDVP2BqNRj/opS8VsXDRb7jr1QjEciWyui29tbhoG/byhvKn3SE2/J
ThABmqJo3gW5jwaKew/7R49pm2Vvxk4Nw5wAYLhHVbKHCRjZZtjZeVAU0uUtjk8i
8SvwX5g8pDqEhtsRi1aOWU5+AP56
-----END CERTIFICATE-----
subject=/C=US/ST=California/L=Mountain View/O=Google Inc/CN=mx.google.com
issuer=/C=US/O=Google Inc/CN=Google Internet Authority G2
---
No client certificate CA names sent
---
SSL handshake has read 4525 bytes and written 450 bytes
---
New, TLSv1/SSLv3, Cipher is ECDHE-RSA-AES128-GCM-SHA256
Server public key is 2048 bit
Secure Renegotiation IS supported
Compression: NONE
Expansion: NONE
SSL-Session:
    Protocol  : TLSv1.2
    Cipher    : ECDHE-RSA-AES128-GCM-SHA256
    Session-ID: D413E1918D87A8E1CDCE37606F539CBF9CD6F79F1BED4F58D0B03E9801738ACF
    Session-ID-ctx: 
    Master-Key: 771FA531A831086DF2311DA9F6807FABAB77022421BEEA247A241766850C53AC85295112AC44250232E21A779A1B2985
    Key-Arg   : None
    PSK identity: None
    PSK identity hint: None
    SRP username: None
    TLS session ticket lifetime hint: 100800 (seconds)
    TLS session ticket:
    0000 - 92 4d 45 c8 99 89 6f 92-27 10 30 69 a5 63 0f cd   .ME...o.'.0i.c..
    0010 - 1d f6 3e 72 43 a5 b3 ae-25 3a c6 94 e5 a2 df e1   ..>rC...%:......
    0020 - 92 76 d4 2d 4d cf 4b b9-f5 4c bd 4b 51 10 c8 48   .v.-M.K..L.KQ..H
    0030 - e5 a5 15 75 15 a1 92 bc-cd cb 64 c0 c9 5e c1 78   ...u......d..^.x
    0040 - 2d 51 13 82 d4 c6 cc 33-84 39 7e 09 85 9c 6b 26   -Q.....3.9~...k&
    0050 - d5 86 5b be 61 4c b0 95-64 07 0a 1d f6 5d 60 de   ..[.aL..d....]`.
    0060 - fe 37 21 58 d6 e9 35 a4-a3 90 76 63 2d e3 d0 ba   .7!X..5...vc-...
    0070 - 80 64 4f 5a 37 08 27 b4-7e 2c e9 eb 53 ab cd cc   .dOZ7.'.~,..S...
    0080 - e3 25 ad db 26 0e c8 27-7b d4 a2 c0 82 60 05 28   .%..&..'{....`.(
    0090 - 8f dd d6 d2 8d bf c0 18-72 c1 1c d0 85 21 72 92   ........r....!r.
    00a0 - 72 92 c1 8a                                       r...

    Start Time: 1464621046
    Timeout   : 300 (sec)
    Verify return code: 20 (unable to get local issuer certificate)
---
250 SMTPUTF8
quit
221 2.0.0 closing connection w127si10551741ywc.301 - gsmtp
read:errno=0

viernes, 27 de mayo de 2016

Configurando TLS para envío a servidores externos en Zimbra

Desde hace un par de meses  para acá, a Google le pareció muy buena idea señalar a aquellos servicios de correos que no iniciaran TLS cuando se comunicarán con sus servidores:
Este infame candado rojo venía a decir que el dominio no era capaz de cifrar la comunicación cuando enviaba correos por el puerto 25 a gmail (Lo que dicho sea de paso es la forma en que esto ha funcionado toda la vida, el eterno chiste sobre la seguridad de nuestros correos electrónicos).
Para solventar este problema en zimbra, dependemos un poco de la versión:

  • Zimbra 8.6 (No lo he probé en versiones anteriores):
    Ya esta activado por defecto y todo bien.
  • Zimbra 8.0.6 (Cuidado con versiones anteriores, yo supondría que funciona de 8.0.6 en adelante)
    Es necesario correr configurarlo de la siguiente forma:
$ zmlocalconfig -e postfix_smtp_tls_security_level=may
$ zmlocalconfig -s postfix_smtp_tls_security_level
postfix_smtp_tls_security_level = may
Y con eso quedaría configurado. Lo más importante aparte de que ya no sale el candado rojo
 Y claro, las cabeceras han cambiado desde cuando no cifraba

 A algo más bonito e interesante
Por otra parte, el anterior correo es una especie de reenvío que hago de mi cuenta hacia mi cuenta de gmail, pero el correo enviado directamente se ve más bonito para google:

Fuentes:
Zimbra : Outbound smtp TLS encryption
Postconf keys
Using TLS for mail delivery from postfix to another TLS activated mail server
Best-Practice Recommendations for a Secure Zimbra Configuration

viernes, 20 de mayo de 2016

De como publicamos con pyramid una aplicación python

Todo el trabajo de ahora en adelante será el de usar la clase Usuarios como el core de nuestra aplicación, mientras que pyramid se encarga de publicarla webmente.

Básicamente, lo que se necesita es configurar las rutas en ./ambiente/aplicacion/aplicacion/__init__.py, mediante config.add_route()
    
    # Cuando request_method se encuentra acá, es devuelto un error como         
    # "The resource could not be found."                                        
    # Así que es como mejor configurar desde acá a request_method
    config.add_route('ficheros_listado', '/ficheros', request_method='GET')
Y luego escribir el controlador (Que acá se llaman vistas) para dicha ruta.
@view_config(route_name='ficheros_listado', renderer='json')                    
def ficheros_listado(request):                                                  
    """Cuando request_method se configura acà, el mensaje es diferente porque   
    la operación alcanzada es diferente                                         
        The resource could not be found.

predicate mismatch for view get_listar_ficheros (request_method = GET,HEAD)
    Por tanto lo mejor es configurarlo allá en __init__                         
    """                                                                         
    ficheros = Usuarios()                                                       
    listado = ficheros.listado()                                                
    return {'respuesta': listado}
Para no alargar demasiado, __init__.py queda de la siguiente forma:
# coding: utf-8                                                                 
from pyramid.config import Configurator                                         
                                                                                
                                                                                
def main(global_config, **settings):                                            
    """ This function returns a Pyramid WSGI application.                       
    """                                                                         
    config = Configurator(settings=settings)                                    
    # Cuando request_method se encuentra acá, es devuelto un error como         
    # "The resource could not be found."                                        
    # Así que es como mejor configurar desde acá a request_method               
    config.add_route('ficheros_listado', '/ficheros', request_method='GET')     
    config.add_route('ficheros_detalle', '/ficheros/{usuario}', request_method='GET')
    config.add_route('ficheros_creacion', '/ficheros', request_method='POST')   
    config.add_route('ficheros_modificacion', '/ficheros/{usuario}', request_method='PUT')
    config.add_route('ficheros_borrado', '/ficheros/{usuario}', request_method='DELETE')
    config.scan()                                                               
    return config.make_wsgi_app()
Y el fichero ./ambiente/aplicacion/aplicacion/views/actividades.py con las vistas (Que en realidad son controladores), queda de la siguiente forma:
# coding: utf-8
from pyramid.view import view_config
from ..fichero_app.ficheros import Usuarios
from pyramid import httpexceptions as exception

@view_config(route_name='ficheros_listado', renderer='json')
def ficheros_listado(request):
    """Cuando request_method se configura acà, el mensaje es diferente porque 
    la operación alcanzada es diferente
        The resource could not be found.


     predicate mismatch for view get_listar_ficheros (request_method = GET,HEAD)
    Por tanto lo mejor es configurarlo allá en __init__
    """
    ficheros = Usuarios()
    listado = ficheros.listado()
    return {'respuesta': listado}

@view_config(route_name='ficheros_detalle', renderer='json')
def ficheros_detalle(request):
    usuario = request.matchdict['usuario']
    ficheros = Usuarios()
    detalle = ficheros.detalle(usuario)
    return {'respuesta': detalle}

@view_config(route_name='ficheros_creacion', renderer='json')
def ficheros_creacion(request):
    try:
        usuario = request.json_body['usuario']
        data = request.json_body['data']
    except Exception as e:
        return exception.HTTPBadRequest()
    ficheros = Usuarios()
    creacion = ficheros.creacion(usuario, data)
    return {'respuesta': creacion}

@view_config(route_name='ficheros_modificacion', renderer='json')
def ficheros_modificacion(request):
    usuario = request.matchdict['usuario']
    try:
        data = request.json_body['data']
    except Exception as e:
        return exception.HTTPBadRequest()
    ficheros = Usuarios()
    modificacion = ficheros.modificacion(usuario, data)
    return {'respuesta': modificacion}

@view_config(route_name='ficheros_borrado', renderer='json')
def ficheros_borrado(request):
    usuario = request.matchdict['usuario']
    ficheros = Usuarios()
    borrado = ficheros.borrado(usuario)
    return {'respuesta': borrado}
Eso es todo. Con lo anterior nuestra clase Usuarios ha sido publicada a la web

Para ver que esto es cierto corremos el servidor web integrado desde el directorio raíz de la aplicación en ./ambiente/aplicacion (Que queda bien para desarrollo, pero no es la solución definitiva cuando lo vayamos a tirar en producción)
pserve development.ini --reload
En este punto ya podemos probar la aplicación desde la web. Bien podría ser desde un navegador, pero no. Como la idea es que esto funcione como una API, no deberíamos ocuparnos de crear interfaz web alguna, algo que sería necesario para verificar muchas de las operaciones que hemos definido. Lo más sencillo en este caso es crear las peticiones HTTP desde consola mediante curl, herramienta que sencillamente cambia la vida en cuanto se le toma práctica.
  • Para verificar el listado de ficheros en la raíz de ficheros/ mediante GET, especificamos el verbo HTTP con la opción -X
$ curl -X GET http://localhost:6543/ficheros -w "\n" 
{"respuesta": ["alortiz", "kpenate", "opineda"]}
  • Verifica la obtención de los datos particulares de un usuario en ficheros/{usuario}
$ curl -X GET http://localhost:6543/ficheros/alortiz -w "\n"
{"respuesta": {"palabras": ["ambiente", "publico"], "nombre": "Alexander", "apellido": "Ort\u00edz"}}
  • Creamos un usuario mediante el POST de ficheros/. Atención a como añadimos la cabecera content-type con -H,  y el valor que le hemos configurado. Luego, la opción -d nos permite configurar datos casi en bruto. El dato que enviamos es precisamente una cadena de texto que representa un JSON válido
$ curl -i -X POST http://localhost:6543/ficheros -w "\n" -H "content-type: application/json; charset=UTF-8" -d '{"data": {"palabras": ["estruendo", "epilepsia"], "nombre": "Francisco", "apellido": "Cornejo"}, "usuario": "fcornejo"}' 
{"respuesta": {"palabras": ["estruendo", "epilepsia"], "nombre": "Francisco", "apellido": "Cornejo"}}

$ curl -X GET http://localhost:6543/ficheros/fcornejo -w "\n"
{"respuesta": {"palabras": ["estruendo", "epilepsia"], "nombre": "Francisco", "apellido": "Cornejo"}}
  • Ahora modificamos al usuario fcornejo antes creado mediante el verbo PUT
$ curl -X PUT http://localhost:6543/ficheros/fcornejo -H "content-type: application/json" -d '{"data": {"palabras": ["esfuerzo", "historia"], "nombre": "Francisco", "apellido": "Cornejo"}, "usuario": "fcornejo"}' -w "\n"
{"respuesta": {"palabras": ["esfuerzo", "historia"], "nombre": "Francisco", "apellido": "Cornejo"}}
 
$ curl -X GET http://localhost:6543/ficheros/fcornejo -w "\n"
{"respuesta": {"palabras": ["esfuerzo", "historia"], "nombre": "Francisco", "apellido": "Cornejo"}}
  • Y por ultimo borramos al usuario fcornejo con el verbo DELETE
$ curl -X GET http://localhost:6543/ficheros -w "\n"
{"respuesta": ["alortiz", "kpenate", "opineda", "fcornejo"]}

$ curl -X DELETE http://localhost:6543/ficheros/fcornejo -w "\n"
{"respuesta": "fcornejo"}

$ curl -X GET http://localhost:6543/ficheros -w "\n"            
{"respuesta": ["alortiz", "kpenate", "opineda"]}

miércoles, 18 de mayo de 2016

La pequeña y estúpida aplicación fichero_app

Tomo un momento para hablar de una pequeña aplicación que vamos a integrar con pyramid. Integrar quizá no: Vamos a publicar la aplicación con pyramid.

Como se verá, la aplicación es totalmente funcional por si misma. Bueno, que si se quiere ver es una pequeña biblioteca, casi inútil desde un punto de vista práctico. Vamos, que ni siquiera esta capturando excepciones. Sin embargo, yo voy a suponer que esta aplicación es la joya de la corona, que dentro de cada función encierra grandes operaciones de tratamiento de ficheros que me gustaría que estuvieran disponibles para otras aplicaciones, sobre todo, para un cliente web que funcione con AngularJS

Con ustedes, la aplicación ficheros.py:
# coding: utf-8
 
import os
import json
 
class Usuarios():
    """ Manejo de usuarios por medio de ficheros json
    """

    def __init__(self):
        self.direccion = os.getcwd() + '/archivos/'
     
    def listado(self):
        """ Retorna lista con usuarios registrados
        Returns:
            listado: (list) Lista de usuarios registrados
        """
        try:
            lista = os.listdir(self.direccion)
            usuarios = [usuario.split('.')[0] for usuario in lista]
        except IOError as e:
            return {'error': e.args}
        return usuarios
     
    def detalle(self, usuario):
        """ Retorna diccionario con información de usuario
        Args:
            usuario: (str) usuario del que se desean detalles
        Returns:
            dict: Información del usuario
        """
        try: 
            fichero = open(self.direccion + '/' + usuario + '.json') 
            contenido = json.load(fichero)
        except ValueError as e:
            return {'error': e.message}
        except IOError as e:
            return {'error': e.args[1]}
        return contenido
    
    def creacion(self, usuario, datos):
        """ Retorna diccionario de datos con los que se ha creado al usuario
        Args:
            usuario: (str) nombre del usuario a crear
            datos: (dict) datos de la información del usuario
        Returns:
            dict: Datos con los que se ha creado al usuario
        """
        try:
            fichero = open(self.direccion + '/' + usuario + '.json', 'w')
            contenido = json.dump(datos, fichero)
        except ValueError as e:
            return {'error': e.message}
        except IOError as e:
            return {'error': e.args[1]}
        return datos
     
    def modificacion(self, usuario, datos):
        """ Retorna diccionario con los datos modificados del usuario
        Args:
            usuario: (str) nombre del usuario a modificar
            datos: (dict) datos a modificar del usuario
        Returns:
            dict: Datos con los que se ha creado al usuario
        """
        try:
            fichero = open(self.direccion + '/' + usuario + '.json', 'w+')
            contenido = json.dump(datos, fichero)
        except ValueError as e:
            return {'error': e.message}
        except IOError as e:
            return {'error': e.args[1]}
        return datos
    
    def borrado(self, usuario):
        """ Retorna nombre de usuario borrado
        Args: 
            usuario: (str) nombre del usuario a borrar
        Returns:
            usuario: (str) nombre del usuario borrado
        """
        try:
            fichero =  self.direccion + '/' + usuario + '.json'
            os.remove(fichero)
        except OSError as e:
            return {'error': e.args[1]}
        return usuario

Como puede intuirse del código, tenemos cinco operaciones definidas.
  • Listar los ficheros que se encuentran en ./archivos/ (listar())
  • Ver en detalle un fichero .json que se encuentra dentro de ./archivos/ (detalle()), y que básicamente consiste en retornar el contenido json del fichero traducido a un tipo python válido.
  • Crear un nuevo fichero json dentro de ./archivos/ (creacion())
  • Modificar un fichero json, reemplazando todo el contenido actual con el contenido que tiene las modificaciones ya aplicadas. Parece ser que la precisión en las definiciones es algo importante
  • Borrar un fichero json dentro de ./archivos/ 
Que sí, que me estoy inspirando en CRUD para hacer esto. Es una forma muy cómoda para pensar una aplicación.

Para empezar a usar la aplicación, bastaría con crear el directorio ./archivos/ junto con algunos ficheros de prueba. Pues que lo que viene a decir la documentación es que cada método de la aplicación recibe, en su mayoría, un usuario y un diccionario datos. Y en su mayoría retorna otro diccionario
$ mkdir ./archivos
$ echo '{"palabras": ["ambiente", "publico"], "nombre": "Alexander", "apellido": "Ort\u00edz"}' > ./archivos/alortiz.json
$ echo '{"palabras": ["contenido", "fichero"], "nombre": "Karen", "apellido": "Pe\u00f1ate"}' > archivos/kpenate.json
$ echo '{"palabras": ["posix", "permisos"], "nombre": "Olga", "apellido": "Pineda"}' > archivos/opineda.json
Por último, convertiremos esta diminuta aplicación en una paquete python válido. Tomando en cuenta lo sencillo que es en python: Creamos un directorio, ubicamos todos los ficheros creados dentro y creamos un ficheros __init__.py
$ mkdir ./fichero_app
$ mv archivos/ ficheros.py fichero_app/
$ touch fichero_app/__init__.py
Un script python que tenga el mismo directorio raíz que fichero_app (En este momento, ./ambiente/aplicacion/aplicacion/fichero_app), puede importar la clase Fichero de la siguiente forma:
from fichero_app.ficheros import Usuarios
La cuestión sobre el directorio a usar es un poco espinosa, producto de no parametrizarla como debiera. El truco es que la búsqueda del directorio ficheros ocurrirá en la base desde donde se ejecute la aplicación que llama a nuestra pequeña aplicación como módulo.

El siguiente paso va en la línea de la guía con Pyramid: De como publicamos con pyramid una aplicación python

lunes, 16 de mayo de 2016

Primeros pasos con Pyramid

Que quiero hacer con python una una aplicación web simple y precisa, que no consuma de una base de datos y que tampoco produzca HTML sino contenido JSON (Con lo puedo usar AngularJS para el respectivo cliente web). Así que decidí usar Pyramid, que tenía algunas de las cosas que a primera vista necesito (O más bien que creo necesitar).

La siguiente guía está en constante desarrollo. Básicamente estoy haciendo los primeros pasos después de revisar la guía oficial más completa
La recomendación oficial es crear un entorno virtual en el cual instalar los paquetes
$ virtualenv ambiente
$ cd ambiente/
Activamos el entorno, lo que no significa más que configurar unas cuantas variables del sistema para que usen el ambiente virtual, lo que puede verificarse por curiosidad al revisar el path correspondiente a pip
$ source bin/activate
$ which pip
/home/vtacius/public_html/ambiente/bin/pip
Lo siguiente será instalar pyramid, y para empezar bien y rápido, usaremos pcreate para el scaffolding (Término que no es del todo correcto en estas condiciones) de la aplicación
$ pip install pyramid
$ pcreate -s starter aplicacion
$ cd aplicacion/
Este nivel de la aplicación (./ambiente/aplicacion/) aprovechamos para cambiar el fichero setup.py para retirar en el arreglo requires a pyramid_chameleon porque es el paquete para producir HTML por medio de plantillas, y no pienso usarlo. Agregamos nose y webtest porque no se han incluido en el proyecto
import os

from setuptools import setup, find_packages

here = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(here, 'README.txt')) as f:
    README = f.read()
with open(os.path.join(here, 'CHANGES.txt')) as f:
    CHANGES = f.read()

requires = [
    'pyramid',
    'pyramid_debugtoolbar',
    'waitress',
    'nose',
    'webtest'
    ]

(...)
Ahora instalamos todos los paquetes requeridos por la aplicación en nuestro entorno virtual:
$ python setup.py develop
A este nivel (./ambiente/aplicacion/) hay que volver cuando se quiere realizar los test y correr waitress.

Ahora entramos dentro de nuestra aplicación propiamente, llamada aplicacion, y acomodamos según nuestras necesidades: eliminamos el directorio template y static porque no pienso usarlo, y volvemos un paquete a views porque planeo crecer con la aplicación aunque sea un poquito:
$ cd aplicacion/
$ rm -rf views.py templates/ static
$ mkdir views
$ touch views/__init__.py
Del fichero __init__.py eliminamos las referencias a pyramid_chameleon, y la configuración de contenido estático que nos es innecesaria:
    config.include('pyramid_chameleon')
    config.add_static_view('static', 'static', cache_max_age=3600)
Para este primer ejercicio, basta con crear un fichero en views/ de cualquier nombre (Sí, cualquier nombre, la magia de scan) de la siguiente forma:
# coding: utf-8
from pyramid.view import view_config

@view_config(route_name='home', renderer='json')
def get_actividad_list(request):
    return {'home': '¡Hola Mundo!'}
Volvemos un directorio atrás y corremos la aplicación:
$ pserve development.ini --reload
Ahora lo probamos, desde consola porque somos gente hardcore que odia a los navegadores (¡Muerte a los navegadores!)
$ curl -i -X GET http://localhost:6543
HTTP/1.1 200 OK
Content-Length: 29
Content-Type: application/json; charset=UTF-8
Date: Tue, 17 May 2016 04:11:18 GMT
Server: waitress


{"home": "\u00a1Hola Mundo!"}
Preparé una especie de test que sale más extenso que el código que se quiere testar, tendrá que valer para primer esfuerzo. El punto más discutible es el de la fixture, sin embargo la idea debería estar en ese sentido. El contenido de tests.py queda de la siguiente forma:
# coding: utf-8
from unittest import TestCase

from pyramid import testing

class ViewTests(TestCase):
    def setUp(self):
        self.config = testing.setUp()
        # Atentos a nuestro intento de fixture
        self.res_get_actividad_list = {'home': '¡Hola Mundo!'}

    def tearDown(self):
        testing.tearDown()

    def test_get_actividad_list(self):
        # Hacemos las importaciones acá en lugar de globalmente para que el test sea más sincero
        from .views.actividades import get_actividad_list
        peticion = testing.DummyRequest()
        respuesta = get_actividad_list(peticion)
        self.assertEqual(respuesta['home'], self.res_get_actividad_list['home'])

class ViewFunctionalTest(TestCase):
    def setUp(self):
        # Este método debería considerarse un __init__ más apropiado en la cuestión de testeo
        # Hacemos las importaciones acá en lugar de globalmente para que el test sea más sincero
        from aplicacion import main
        app = main({})
        from webtest import TestApp
      
        self.testapp = TestApp(app)

        # Atentos a nuestro intento de fixture
        self.res_get_actividad_list = {'home': u'¡Hola Mundo!'}
  
    def test_get_actividad_list(self):
        respuesta = self.testapp.get('/', status=200, xhr=True)
        # Es discutible que haga esto, supongo que me aseguro que no vaya a cambiar la respuesta por accidente
        self.assertEqual('application/json', respuesta.content_type) 
        # Trato de usar la fixture para verificar contenido ya conocido
        proposicion = self.res_get_actividad_list['home']
        self.assertEqual(respuesta.json_body['home'], proposicion)

Ahora corremos el test en el directorio raíz de la aplicación
(ambiente)vtacius@ilaria:~/public_html/ambiente/aplicacion> nosetests
..
Name                              Stmts   Miss  Cover   Missing
---------------------------------------------------------------
aplicacion.py                         8      0   100%   
aplicacion/tests.py                  25      0   100%   
aplicacion/views.py                   0      0   100%   
aplicacion/views/actividades.py       3      0   100%   
---------------------------------------------------------------
TOTAL                                36      0   100%   
----------------------------------------------------------------------
Ran 2 tests in 1.123s

OK
Claro, los test automatizados son realmente algo increíble. Apenas pueden reemplazar al hecho de usar curl desde consola, pero todo lo compensan con el hecho de ser automatizados.
El siguiente paso es revisar la aplicación que vamos a publicar por medio de pyramid: fichero_app

Otros apuntes interesantes

Otros apuntes interesantes