#include <Python.h>
#define SSL_MODULE
#include "ssl.h"
static char *CVSid = "@(#) $Id: context.c,v 1.2 2004/09/23 14:25:28 murata Exp $";
static int
global_passphrase_callback(char *buf, int maxlen, int verify, void *arg)
{
int len;
char *str;
PyObject *argv, *ret = NULL;
ssl_ContextObj *ctx = (ssl_ContextObj *)arg;
argv = Py_BuildValue("(iiO)", maxlen, verify, ctx->passphrase_userdata);
if (ctx->tstate != NULL)
{
MY_END_ALLOW_THREADS(ctx->tstate);
ret = PyEval_CallObject(ctx->passphrase_callback, argv);
MY_BEGIN_ALLOW_THREADS(ctx->tstate);
}
else
{
ret = PyEval_CallObject(ctx->passphrase_callback, argv);
}
Py_DECREF(argv);
if (ret == NULL)
return 0;
if (!PyObject_IsTrue(ret))
{
Py_DECREF(ret);
return 0;
}
if (!PyString_Check(ret))
{
Py_DECREF(ret);
return 0;
}
len = PyString_Size(ret);
if (len > maxlen)
len = maxlen;
str = PyString_AsString(ret);
strncpy(buf, str, len);
Py_XDECREF(ret);
return len;
}
static int
global_verify_callback(int ok, X509_STORE_CTX *x509_ctx)
{
PyObject *argv, *ret;
SSL *ssl;
ssl_ConnectionObj *conn;
crypto_X509Obj *cert;
int errnum, errdepth, c_ret;
cert = crypto_X509_New(X509_STORE_CTX_get_current_cert(x509_ctx), 0);
errnum = X509_STORE_CTX_get_error(x509_ctx);
errdepth = X509_STORE_CTX_get_error_depth(x509_ctx);
ssl = (SSL *)X509_STORE_CTX_get_app_data(x509_ctx);
conn = (ssl_ConnectionObj *)SSL_get_app_data(ssl);
argv = Py_BuildValue("(OOiii)", (PyObject *)conn, (PyObject *)cert,
errnum, errdepth, ok);
Py_DECREF(cert);
if (conn->tstate != NULL)
{
MY_END_ALLOW_THREADS(conn->tstate);
ret = PyEval_CallObject(conn->context->verify_callback, argv);
MY_BEGIN_ALLOW_THREADS(conn->tstate);
}
else
{
ret = PyEval_CallObject(conn->context->verify_callback, argv);
}
Py_DECREF(argv);
if (ret == NULL)
return 0;
if (PyObject_IsTrue(ret))
{
X509_STORE_CTX_set_error(x509_ctx, X509_V_OK);
c_ret = 1;
}
else
c_ret = 0;
Py_DECREF(ret);
return c_ret;
}
static void
global_info_callback(SSL *ssl, int where, int _ret)
{
ssl_ConnectionObj *conn = (ssl_ConnectionObj *)SSL_get_app_data(ssl);
PyObject *argv, *ret;
argv = Py_BuildValue("(Oii)", (PyObject *)conn, where, _ret);
if (conn->tstate != NULL)
{
MY_END_ALLOW_THREADS(conn->tstate);
ret = PyEval_CallObject(conn->context->info_callback, argv);
if (ret == NULL)
PyErr_Clear();
else
Py_DECREF(ret);
MY_BEGIN_ALLOW_THREADS(conn->tstate);
}
else
{
ret = PyEval_CallObject(conn->context->info_callback, argv);
if (ret == NULL)
PyErr_Clear();
else
Py_DECREF(ret);
}
Py_DECREF(argv);
return;
}
static char ssl_Context_load_verify_locations_doc[] = "\n\
Let SSL know where we can find trusted certificates for the certificate\n\
chain\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be:\n\
cafile - Which file we can find the certificates\n\
Returns: None\n\
";
static PyObject *
ssl_Context_load_verify_locations(ssl_ContextObj *self, PyObject *args)
{
char *cafile;
if (!PyArg_ParseTuple(args, "s:load_verify_locations", &cafile))
return NULL;
if (!SSL_CTX_load_verify_locations(self->ctx, cafile, NULL))
{
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_set_passwd_cb_doc[] = "\n\
Set the passphrase callback\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be:\n\
callback - The Python callback to use\n\
userdata - (optional) A Python object which will be given as\n\
argument to the callback\n\
Returns: None\n\
";
static PyObject *
ssl_Context_set_passwd_cb(ssl_ContextObj *self, PyObject *args)
{
PyObject *callback = NULL, *userdata = Py_None;
if (!PyArg_ParseTuple(args, "O|O:set_passwd_cb", &callback, &userdata))
return NULL;
if (!PyCallable_Check(callback))
{
PyErr_SetString(PyExc_TypeError, "expected PyCallable");
return NULL;
}
Py_DECREF(self->passphrase_callback);
Py_INCREF(callback);
self->passphrase_callback = callback;
SSL_CTX_set_default_passwd_cb(self->ctx, global_passphrase_callback);
Py_DECREF(self->passphrase_userdata);
Py_INCREF(userdata);
self->passphrase_userdata = userdata;
SSL_CTX_set_default_passwd_cb_userdata(self->ctx, (void *)self);
Py_INCREF(Py_None);
return Py_None;
}
static char ssl_Context_use_certificate_file_doc[] = "\n\
Load a certificate from a file\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be:\n\
certfile - The name of the certificate file\n\
filetype - (optional) The encoding of the file, default is PEM\n\
Returns: None\n\
";
static PyObject *
ssl_Context_use_certificate_file(ssl_ContextObj *self, PyObject *args)
{
char *certfile;
int filetype = SSL_FILETYPE_PEM;
if (!PyArg_ParseTuple(args, "s|i:use_certificate_file", &certfile, &filetype))
return NULL;
if (!SSL_CTX_use_certificate_file(self->ctx, certfile, filetype))
{
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_use_certificate_doc[] = "\n\
Load a certificate from a X509 object\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be:\n\
cert - The X509 object\n\
Returns: None\n\
";
static PyObject *
ssl_Context_use_certificate(ssl_ContextObj *self, PyObject *args)
{
static PyTypeObject *crypto_X509_type = NULL;
crypto_X509Obj *cert;
if (!crypto_X509_type)
{
if (!PyArg_ParseTuple(args, "O:use_certificate", &cert))
return NULL;
if (strcmp(cert->ob_type->tp_name, "X509") != 0 ||
cert->ob_type->tp_basicsize != sizeof(crypto_X509Obj))
{
PyErr_SetString(PyExc_TypeError, "Expected an X509 object");
return NULL;
}
crypto_X509_type = cert->ob_type;
}
else
if (!PyArg_ParseTuple(args, "O!:use_certificate", crypto_X509_type,
&cert))
return NULL;
if (!SSL_CTX_use_certificate(self->ctx, cert->x509))
{
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_use_privatekey_file_doc[] = "\n\
Load a private key from a file\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be:\n\
keyfile - The name of the key file\n\
filetype - (optional) The encoding of the file, default is PEM\n\
Returns: None\n\
";
static PyObject *
ssl_Context_use_privatekey_file(ssl_ContextObj *self, PyObject *args)
{
char *keyfile;
int filetype = SSL_FILETYPE_PEM, ret;
if (!PyArg_ParseTuple(args, "s|i:use_privatekey_file", &keyfile, &filetype))
return NULL;
MY_BEGIN_ALLOW_THREADS(self->tstate);
ret = SSL_CTX_use_PrivateKey_file(self->ctx, keyfile, filetype);
MY_END_ALLOW_THREADS(self->tstate);
if (PyErr_Occurred())
{
flush_error_queue();
return NULL;
}
if (!ret)
{
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_use_privatekey_doc[] = "\n\
Load a private key from a PKey object\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be:\n\
pkey - The PKey object\n\
Returns: None\n\
";
static PyObject *
ssl_Context_use_privatekey(ssl_ContextObj *self, PyObject *args)
{
static PyTypeObject *crypto_PKey_type = NULL;
crypto_PKeyObj *pkey;
if (!crypto_PKey_type)
{
if (!PyArg_ParseTuple(args, "O:use_privatekey", &pkey))
return NULL;
if (strcmp(pkey->ob_type->tp_name, "PKey") != 0 ||
pkey->ob_type->tp_basicsize != sizeof(crypto_PKeyObj))
{
PyErr_SetString(PyExc_TypeError, "Expected a PKey object");
return NULL;
}
crypto_PKey_type = pkey->ob_type;
}
else
if (!PyArg_ParseTuple(args, "O!:use_privatekey", crypto_PKey_type, &pkey))
return NULL;
if (!SSL_CTX_use_PrivateKey(self->ctx, pkey->pkey))
{
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_check_privatekey_doc[] = "\n\
Check that the private key and certificate match up\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be empty\n\
Returns: None (raises an exception if something's wrong)\n\
";
static PyObject *
ssl_Context_check_privatekey(ssl_ContextObj *self, PyObject *args)
{
if (!PyArg_ParseTuple(args, ":check_privatekey"))
return NULL;
if (!SSL_CTX_check_private_key(self->ctx))
{
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_load_client_ca_doc[] = "\n\
Load the trusted certificates that will be sent to the client (basically\n\
telling the client \"These are the guys I trust\")\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be:\n\
cafile - The name of the certificates file\n\
Returns: None\n\
";
static PyObject *
ssl_Context_load_client_ca(ssl_ContextObj *self, PyObject *args)
{
char *cafile;
if (!PyArg_ParseTuple(args, "s:load_client_ca", &cafile))
return NULL;
SSL_CTX_set_client_CA_list(self->ctx, SSL_load_client_CA_file(cafile));
Py_INCREF(Py_None);
return Py_None;
}
static char ssl_Context_set_session_id_doc[] = "\n\
Set the session identifier, this is needed if you want to do session\n\
resumption (which, ironically, isn't implemented yet)\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be:\n\
buf - A Python object that can be safely converted to a string\n\
Returns: None\n\
";
static PyObject *
ssl_Context_set_session_id(ssl_ContextObj *self, PyObject *args)
{
char *buf;
int len;
if (!PyArg_ParseTuple(args, "s#:set_session_id", &buf, &len))
return NULL;
if (!SSL_CTX_set_session_id_context(self->ctx, buf, len))
{
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_set_verify_doc[] = "\n\
Set the verify mode and verify callback\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be:\n\
mode - The verify mode, this is either SSL_VERIFY_NONE or\n\
SSL_VERIFY_PEER combined with possible other flags\n\
callback - The Python callback to use\n\
Returns: None\n\
";
static PyObject *
ssl_Context_set_verify(ssl_ContextObj *self, PyObject *args)
{
int mode;
PyObject *callback = NULL;
if (!PyArg_ParseTuple(args, "iO:set_verify", &mode, &callback))
return NULL;
if (!PyCallable_Check(callback))
{
PyErr_SetString(PyExc_TypeError, "expected PyCallable");
return NULL;
}
Py_DECREF(self->verify_callback);
Py_INCREF(callback);
self->verify_callback = callback;
SSL_CTX_set_verify(self->ctx, mode, global_verify_callback);
Py_INCREF(Py_None);
return Py_None;
}
static char ssl_Context_set_verify_depth_doc[] = "\n\
Set the verify depth\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be:\n\
depth - An integer specifying the verify depth\n\
Returns: None\n\
";
static PyObject *
ssl_Context_set_verify_depth(ssl_ContextObj *self, PyObject *args)
{
int depth;
if (!PyArg_ParseTuple(args, "i:set_verify_depth", &depth))
return NULL;
SSL_CTX_set_verify_depth(self->ctx, depth);
Py_INCREF(Py_None);
return Py_None;
}
static char ssl_Context_get_verify_mode_doc[] = "\n\
Get the verify mode\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be empty\n\
Returns: The verify mode\n\
";
static PyObject *
ssl_Context_get_verify_mode(ssl_ContextObj *self, PyObject *args)
{
int mode;
if (!PyArg_ParseTuple(args, ":get_verify_mode"))
return NULL;
mode = SSL_CTX_get_verify_mode(self->ctx);
return PyInt_FromLong((long)mode);
}
static char ssl_Context_get_verify_depth_doc[] = "\n\
Get the verify depth\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be empty\n\
Returns: The verify depth\n\
";
static PyObject *
ssl_Context_get_verify_depth(ssl_ContextObj *self, PyObject *args)
{
int depth;
if (!PyArg_ParseTuple(args, ":get_verify_depth"))
return NULL;
depth = SSL_CTX_get_verify_depth(self->ctx);
return PyInt_FromLong((long)depth);
}
static char ssl_Context_load_tmp_dh_doc[] = "\n\
Load parameters for Ephemeral Diffie-Hellman\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be:\n\
dhfile - The file to load EDH parameters from\n\
Returns: None\n\
";
static PyObject *
ssl_Context_load_tmp_dh(ssl_ContextObj *self, PyObject *args)
{
char *dhfile;
BIO *bio;
DH *dh;
if (!PyArg_ParseTuple(args, "s:load_tmp_dh", &dhfile))
return NULL;
bio = BIO_new_file(dhfile, "r");
if (bio == NULL)
return PyErr_NoMemory();
dh = PEM_read_bio_DHparams(bio, NULL, NULL, NULL);
SSL_CTX_set_tmp_dh(self->ctx, dh);
DH_free(dh);
BIO_free(bio);
Py_INCREF(Py_None);
return Py_None;
}
static char ssl_Context_set_cipher_list_doc[] = "\n\
Change the cipher list\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be:\n\
cipher_list - A cipher list, see ciphers(1)\n\
Returns: None\n\
";
static PyObject *
ssl_Context_set_cipher_list(ssl_ContextObj *self, PyObject *args)
{
char *cipher_list;
if (!PyArg_ParseTuple(args, "s:set_cipher_list", &cipher_list))
return NULL;
if (!SSL_CTX_set_cipher_list(self->ctx, cipher_list))
{
exception_from_error_queue();
return NULL;
}
else
{
Py_INCREF(Py_None);
return Py_None;
}
}
static char ssl_Context_set_timeout_doc[] = "\n\
Set session timeout\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be:\n\
t - The timeout in seconds\n\
Returns: The previous session timeout\n\
";
static PyObject *
ssl_Context_set_timeout(ssl_ContextObj *self, PyObject *args)
{
long t, ret;
if (!PyArg_ParseTuple(args, "l:set_timeout", &t))
return NULL;
ret = SSL_CTX_set_timeout(self->ctx, t);
return PyLong_FromLong(ret);
}
static char ssl_Context_get_timeout_doc[] = "\n\
Get the session timeout\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be empty\n\
Returns: The session timeout\n\
";
static PyObject *
ssl_Context_get_timeout(ssl_ContextObj *self, PyObject *args)
{
long ret;
if (!PyArg_ParseTuple(args, ":get_timeout"))
return NULL;
ret = SSL_CTX_get_timeout(self->ctx);
return PyLong_FromLong(ret);
}
static char ssl_Context_set_info_callback_doc[] = "\n\
Set the info callback\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be:\n\
callback - The Python callback to use\n\
Returns: None\n\
";
static PyObject *
ssl_Context_set_info_callback(ssl_ContextObj *self, PyObject *args)
{
PyObject *callback;
if (!PyArg_ParseTuple(args, "O:set_info_callback", &callback))
return NULL;
if (!PyCallable_Check(callback))
{
PyErr_SetString(PyExc_TypeError, "expected PyCallable");
return NULL;
}
Py_DECREF(self->info_callback);
Py_INCREF(callback);
self->info_callback = callback;
SSL_CTX_set_info_callback(self->ctx, global_info_callback);
Py_INCREF(Py_None);
return Py_None;
}
static char ssl_Context_get_app_data_doc[] = "\n\
Get the application data (supplied via set_app_data())\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be empty\n\
Returns: The application data\n\
";
static PyObject *
ssl_Context_get_app_data(ssl_ContextObj *self, PyObject *args)
{
if (!PyArg_ParseTuple(args, ":get_app_data"))
return NULL;
Py_INCREF(self->app_data);
return self->app_data;
}
static char ssl_Context_set_app_data_doc[] = "\n\
Set the application data (will be returned from get_app_data())\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be:\n\
data - Any Python object\n\
Returns: None\n\
";
static PyObject *
ssl_Context_set_app_data(ssl_ContextObj *self, PyObject *args)
{
PyObject *data;
if (!PyArg_ParseTuple(args, "O:set_app_data", &data))
return NULL;
Py_DECREF(self->app_data);
Py_INCREF(data);
self->app_data = data;
Py_INCREF(Py_None);
return Py_None;
}
static char ssl_Context_get_cert_store_doc[] = "\n\
Get the certificate store for the context\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be empty\n\
Returns: A X509Store object\n\
";
static PyObject *
ssl_Context_get_cert_store(ssl_ContextObj *self, PyObject *args)
{
X509_STORE *store;
if (!PyArg_ParseTuple(args, ":get_cert_store"))
return NULL;
if ((store = SSL_CTX_get_cert_store(self->ctx)) == NULL)
{
Py_INCREF(Py_None);
return Py_None;
}
else
{
return (PyObject *)crypto_X509Store_New(store, 0);
}
}
static char ssl_Context_set_options_doc[] = "\n\
Add options. Options set before are not cleared!\n\
\n\
Arguments: self - The Context object\n\
args - The Python argument tuple, should be:\n\
options - The options to add.\n\
Returns: The new option bitmask.\n\
";
static PyObject *
ssl_Context_set_options(ssl_ContextObj *self, PyObject *args)
{
long options;
if (!PyArg_ParseTuple(args, "l:set_options", &options))
return NULL;
return PyInt_FromLong(SSL_CTX_set_options(self->ctx, options));
}
#define ADD_METHOD(name) { #name, (PyCFunction)ssl_Context_##name, METH_VARARGS, ssl_Context_##name##_doc }
static PyMethodDef ssl_Context_methods[] = {
ADD_METHOD(load_verify_locations),
ADD_METHOD(set_passwd_cb),
ADD_METHOD(use_certificate_file),
ADD_METHOD(use_certificate),
ADD_METHOD(use_privatekey_file),
ADD_METHOD(use_privatekey),
ADD_METHOD(check_privatekey),
ADD_METHOD(load_client_ca),
ADD_METHOD(set_session_id),
ADD_METHOD(set_verify),
ADD_METHOD(set_verify_depth),
ADD_METHOD(get_verify_mode),
ADD_METHOD(get_verify_depth),
ADD_METHOD(load_tmp_dh),
ADD_METHOD(set_cipher_list),
ADD_METHOD(set_timeout),
ADD_METHOD(get_timeout),
ADD_METHOD(set_info_callback),
ADD_METHOD(get_app_data),
ADD_METHOD(set_app_data),
ADD_METHOD(get_cert_store),
ADD_METHOD(set_options),
{ NULL, NULL }
};
#undef ADD_METHOD
ssl_ContextObj *
ssl_Context_New(int i_method)
{
SSL_METHOD *method;
ssl_ContextObj *self;
switch (i_method)
{
case ssl_SSLv2_METHOD: method = SSLv2_method(); break;
case ssl_SSLv23_METHOD: method = SSLv23_method(); break;
case ssl_SSLv3_METHOD: method = SSLv3_method(); break;
case ssl_TLSv1_METHOD: method = TLSv1_method(); break;
default:
PyErr_SetString(PyExc_ValueError, "No such protocol");
return NULL;
}
self = PyObject_New(ssl_ContextObj, &ssl_Context_Type);
if (self == NULL)
return (ssl_ContextObj *)PyErr_NoMemory();
self->ctx = SSL_CTX_new(method);
Py_INCREF(Py_None);
self->passphrase_callback = Py_None;
Py_INCREF(Py_None);
self->verify_callback = Py_None;
Py_INCREF(Py_None);
self->info_callback = Py_None;
Py_INCREF(Py_None);
self->passphrase_userdata = Py_None;
Py_INCREF(Py_None);
self->app_data = Py_None;
SSL_CTX_set_app_data(self->ctx, self);
SSL_CTX_set_mode(self->ctx, SSL_MODE_ENABLE_PARTIAL_WRITE |
SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER |
SSL_MODE_AUTO_RETRY);
self->tstate = NULL;
return self;
}
static void
ssl_Context_dealloc(ssl_ContextObj *self)
{
SSL_CTX_free(self->ctx);
Py_DECREF(self->passphrase_callback);
Py_DECREF(self->passphrase_userdata);
Py_DECREF(self->verify_callback);
Py_DECREF(self->info_callback);
Py_DECREF(self->app_data);
PyObject_Del(self);
}
static PyObject *
ssl_Context_getattr(ssl_ContextObj *self, char *name)
{
return Py_FindMethod(ssl_Context_methods, (PyObject *)self, name);
}
PyTypeObject ssl_Context_Type = {
PyObject_HEAD_INIT(NULL)
0,
"Context",
sizeof(ssl_ContextObj),
0,
(destructor)ssl_Context_dealloc,
NULL,
(getattrfunc)ssl_Context_getattr,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL
};
int
init_ssl_context(PyObject *dict)
{
ssl_Context_Type.ob_type = &PyType_Type;
Py_INCREF(&ssl_Context_Type);
if (PyDict_SetItemString(dict, "ContextType", (PyObject *)&ssl_Context_Type) != 0)
return 0;
return 1;
}