Camel - Zaawansowany Messaging - Podstawy

Dzisiaj chciałbym przedstawić framework Apache Camel, framework umożliwiający integrację systemów za pomocą messagingu, który od jakiegoś czasu z powodzeniem wykorzystuję w swoim projekcie. Zacznę od wprowadzenia teoretycznego a następnie przedstawię prosty system wykorzystujący frameworki Camel oraz Spring-dm, który następnie uruchomię w kontenerze Karaf.

Messaging

Messaging jest jednym ze styli integracji (oprócz file sharingu, shared database oraz remote procedure invocation - zainteresowanych odsyłam do katalogu wzorców integracyjnych) umożliwiający trwałą, asynchroniczną, wydajną, luźną integrację systemów. W szczególności wykorzystywany jest do budowania komponentów MOM (Message Oriented Middleware), czyli warstw pośrednich świadczących usługi innym komponentom i umożliwiający ich asynchroniczną integrację. Pojęcie messagingu jest dobrze znane deweloperom wykorzystującym dystrybucję Java EE, a dokładniej specyfikację JMS oraz ziarna MDB, pełniące role asynchronicznych klientów wiadomości z systemu kolejkowego.

Java Message Service vs Messaging

Poniżej fragment specyfikacji JMS określający jej przeznaczenie

This specification describes the objectives and functionality of the Java TM Message Service (JMS). JMS provides a common way for Java programs to create, send, receive and read an enterprise messaging system’s messages.

JMS specyfikuje więc tylko interfejs, umożliwiający współpracę aplikacji z systemem kolejkowym oraz pewne funkcjonalności i modele przetwarzania, które muszą być zapewniane przez mechanizmy ją implementujące. Mówiąc językiem wzorców integracyjnych JMS jest to tylko Chanel Adapter. Chcąc wykorzystać w aplikacji bardziej zaawansowane pojęcia związane z messagingiem takie jak routing, transformacje wiadomości lub integrację z różnymi endpoint’ami sami musimy zadbać o odpowiednią implementacje. Możemy oczywiście tworzyć komponenty sterujące przepływem wiadomości pomiędzy różnymi kanałami, konwertujące i przetwarzające wiadomości, jednak powoduje to komplikację architektury, wzrost złożoności kodu, zwiększa ryzyko błędów itp. Oprócz tego problem może sprawiać także integracja systemu z różnymi punktami końcowymi (nadawcami lub odbiorcami wiadomości) udostępniającymi swoje funkcjonalności za pomocą różnych interfejsów oraz protokołów zdalnych.

Apache Camel

Apache Camel jest frameworkiem bazującym na katalogu wzorców integracyjnych. Posiada wsparcie dla zaawansowanych mechanizmów routingu, transformacji wiadomości oraz kilkudziesięciu implementacji punktów końcowych dzięki czemu stanowi idealne rozwiązanie do budowy warstw pośrednich. Dzięki Camelowi jesteśmy w stanie np:

  • zintegrować komponent wykorzystujący JMS z komponentem implementującym protokół XMPP (Jabber)

  • odczytać dokument z bazy MongoDB, przesłać go do loggera a następnie wysłać na skrzynkę Gmaila ;)

  • odczytać wiadomość z Twittera, przetworzyć ją, zapisać w znode Zookeepera, wysłać SMSem, lub opublikować na kanale IRC … ;)

Przykładów można mnożyć. Pełny opis funkcjonalności oferowanych przez Camela jest dostępny na stronie domowej projektu.

Prosty przykład użycia

Poniżej przedstawiłem prosty przykład użycia Camela do integracji dwóch komponentów - producenta oraz konsumenta. Aby wszystko zadziałało należy wykonać poniższe sześć kroków.

Krok 1 - instalacja Karafa

Na początku należy zainstalować Karafa:

1
dembol@localhost:/opt$ apache-karaf/bin/admin create camel-example
  • Uruchamiamy instancję
1
2
3
4
5
6
7
8
9
10
11
12
13
14
dembol@localhost:/opt$ apache-karaf/instances/camel-example/bin/karaf
        __ __                  ____      
       / //_/____ __________ _/ __/      
      / ,<  / __ `/ ___/ __ `/ /_        
     / /| |/ /_/ / /  / /_/ / __/        
    /_/ |_|\__,_/_/   \__,_/_/         

  Apache Karaf (2.2.8)

Hit '' for a list of available commands
and '[cmd] --help' for help on a specific command.
Hit '' or 'osgi:shutdown' to shutdown Karaf.

karaf@camel-example>
  • Instalujemy potrzebne bundle Camela oraz Springa
1
2
3
karaf@camel-example> features:addurl mvn:org.apache.camel.karaf/apache-camel/2.10.0/xml/features
karaf@camel-example> features:install camel camel-blueprint camel-stream camel-jms
karaf@camel-example> features:install spring-jms activemq

Krok 2 - tworzymy projekt

W kolejnym etapie tworzymy projekt mavenowy. Poniżej umieściłem pom.xml projektu z wymaganymi zależnościami oraz ze skonfigurowanym instrukcjami dotyczącymi importu pakietów.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
4.0.0
    com.devesion.examples
    camel
    1.0-SNAPSHOT
    bundle
    ${project.groupId}.${project.artifactId}

    
        2.3.7
        3.0.7.RELEASE
        
        1.1.0
        4.2.0
        1.6.5
        0.11.0
    

    
        
            
                org.apache.felix
                maven-bundle-plugin
                ${project.plugin.bundle.version}
                true
                
                    
                        
                            org.osgi.*,
                            org.osgi.framework,
                            org.slf4j,
                            javax.*,
                            org.springmodules.*,
                            org.springframework.scheduling.*,
                            org.springframework.scheduling.support,
                            org.springframework.scheduling.config,
                            org.springframework.scheduling.concurrent,
                            org.springframework.jms.core,
                            org.springframework.beans.factory.config,
                            org.apache.activemq,
                            org.apache.activemq.pool,
                            org.apache.activemq.camel,
                            org.apache.activemq.camel.component,
                            org.apache.camel.component.jms
                        
                    
                
            
        
    

    
        
        
            org.osgi
            org.osgi.compendium
            ${project.plugin.osgi.version}
            provided
        
        
            org.osgi
            org.osgi.core
            ${project.plugin.osgi.version}
            provided
        

        
        
            org.springframework
            spring-beans
            ${project.dependency.spring.maven.artifact.version}
            
                
                    commons-logging
                    commons-logging
                
            
            provided
        
        
            org.springframework
            spring-core
            ${project.dependency.spring.maven.artifact.version}
            
                
                    commons-logging
                    commons-logging
                
            
            provided
        
        
            org.springframework.osgi
            spring-osgi-core
            ${project.dependency.spring.osgi.version}
            
                
                    aopalliance
                    aopalliance
                
                
                    commons-logging
                    commons-logging
                
            
            provided
        
        
            org.springframework
            spring-context
            ${project.dependency.spring.maven.artifact.version}
            
                
                    commons-logging
                    commons-logging
                
            
            provided
        

        
        
            org.projectlombok
            lombok
            ${project.dependency.lombok.version}
            provided
        

        
        
            org.slf4j
            slf4j-log4j12
            ${project.dependency.slf4j.log4j12}
            provided
        

        
        
            org.apache.camel
            camel-core
            2.10.3
        
        
            org.apache.camel
            camel-jms
            2.10.3
        
        
            org.apache.camel
            camel-spring
            2.10.3
        
        
            org.apache.activemq
            activemq-camel
            5.4.2 
        
        
            org.apache.activemq
            activemq-pool
            5.4.2 
        
        
            org.apache.activemq
            activemq-core
            5.4.2 
        

        
        
            org.apache.xbean
            xbean-spring
            3.12

Krok 3 - moduł systemu

Moduł będzie bardzo prosty. Składać się będzie z klas producenta oraz konsumenta wiadomości, których obiekty chcemy zintegrować za pomocą messagingu.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.devesion.examples.camel;

import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * @author dembol
 */
@Slf4j
public class Producer {

    private JmsTemplate jmsTemplate;

    public JmsTemplate getJmsTemplate() {
        return jmsTemplate;
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public void produce() {
        log.info("sending message");
        jmsTemplate.send("queue1", new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage msg = session.createTextMessage();
                msg.setText("Message");
                return msg;
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.devesion.examples.camel;

import lombok.extern.slf4j.Slf4j;

/**
 * @author dembol
 */
@Slf4j
public class Consumer {

    public void consume(String body) {
        log.info("message " + body);
    }
}

Odpowiedzialnością producenta jest wysyłanie wiadomości do systemu kolejkowego za pomocą interfejsu JMS. Wiadomości będą adresowane do kolejki queue1. Metodę podłączymy do Task Schedulera Springa, który będzie ją wykonywał co sekundę. Odpowiedzialnością konsumenta jest implementacja metody consume(String body), która będzie wywoływana asynchronicznie przez Camela - będzie pełnić więc rolę punktu końcowego sterowanego zdarzeniami.

Krok 4 - konfiguracja kontekstu Springa

  • Definiujemy bean’y jmsFactory oraz jmsTemplate. Bean jmsTemplate będzie wstrzyknięty do obiektu producenta:
1
tcp://localhost:61616
  • Definiujemy bean’y producer oraz consumer a następnie task scheduler’a, którego konfigurujemy tak aby uruchamiał co sekundę metodę produce() producenta:
  • Kolejnym krokiem jest zdefiniowanie beana activemq używanego przez punkt końcowy Camela oraz wstrzyknięcie do niego konfiguracji JMS.
  • Na samym końcu definiujemy kontekst Camela oraz konfigurujemy routing. Jako endpoint producenta i konsumenta wpisujemy odpowiednio activemq:queue:queue1 oraz bean:consumer. Dzięki temu Camel będzie odczytywał wiadomości z kolejki queue1 i transportował je bezpośrednio do beana consumer. W procesie tym zostanie wykorzystany tzw. bean binding, który określa jakie metody beana zostaną uruchomione oraz w jaki sposób wiadomość będzie przekonwertowana do parametrów wywołania. W naszym przypadku zostanie wykonana metoda consume(String body) beana consumer.
  • Cały kontekst umieszczamy w pliku java/main/resources/META-INF/spring/bundle-context-camel.xml

Krok 5 - uruchamiamy activemq

Kolejnym etapem jest uruchomienie brokera activemq jako niezależnego komponentu systemu (możemy to także zrobić deklaratywnie z poziomu kontekstu springa). Activemq sam zadba o stworzenie odpowiedniego message channel’a (w naszym przypadku kolejki queue1)

1
dembol@localhost:/opt/apache-activemq/bin$ ./activemq setup /home/dembol/.activemqrc
  • Uruchamiamy brokera
1
2
3
4
5
6
7
dembol@localhost:/opt/apache-activemq/bin$ ./activemq start
INFO: Loading '/home/dembol/.activemqrc'
INFO: Using java '/usr/java/jdk/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and 
      log4j.properties to get details
INFO: pidfile created : '/opt/apache-activemq/data/activemq-dembol-Inspiron-7720.pid'
      (pid '5537')

Krok 6 - kompilujemy projekt i instalujemy bundle

* Kompilujemy projekt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
dembol@localhost:~/workspace-idea/examples/camel$ mvn clean package
[INFO] Scanning for projects...
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building com.devesion.examples.camel 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:2.4.1:clean (default-clean) @ camel ---
[INFO] Deleting /home/dembol/workspace-idea/examples/camel/target
[INFO] 
[INFO] --- maven-resources-plugin:2.2:resources (default-resources) @ camel ---
[INFO] Using default encoding to copy filtered resources.
[INFO] 
[INFO] --- maven-compiler-plugin:2.5.1:compile (default-compile) @ camel ---
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 2 source files to /home/dembol/workspace-idea/examples/camel/target/classes
[INFO] 
[INFO] --- maven-resources-plugin:2.2:testResources (default-testResources) @ camel ---
[INFO] Using default encoding to copy filtered resources.
[INFO] 
[INFO] --- maven-compiler-plugin:2.5.1:testCompile (default-testCompile) @ camel ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- maven-surefire-plugin:2.4.3:test (default-test) @ camel ---
[INFO] No tests to run.
[INFO] 
[INFO] --- maven-bundle-plugin:2.3.7:bundle (default-bundle) @ camel ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.593s
[INFO] Finished at: Sat Feb 16 20:19:01 CET 2013
[INFO] Final Memory: 15M/238M
[INFO] ------------------------------------------------------------------------
* Kopiujemy bundle do katalogu $KARAF_HOME/instances/camel-example/deploy. Po chwili w logu zobaczymy naprzemienne komunikaty producenta i konsumenta. Jak widać rozwiązanie zadziałało ;)
1
2
3
4
5
6
7
8
9
dembol@localhost:/opt/apache-karaf/instances/camel-example/data/log# tail -f /opt/apache-karaf/instances/camel-example/data/log/karaf.log 
2013-02-16 20:31:24,000 | INFO  | scheduler-1      | Producer                         | 121 - com.devesion.examples.camel - 1.0.0.SNAPSHOT | sending message
2013-02-16 20:31:24,005 | INFO  | Consumer[queue1] | Consumer                         | 121 - com.devesion.examples.camel - 1.0.0.SNAPSHOT | message Message
2013-02-16 20:31:25,000 | INFO  | scheduler-1      | Producer                         | 121 - com.devesion.examples.camel - 1.0.0.SNAPSHOT | sending message
2013-02-16 20:31:25,004 | INFO  | Consumer[queue1] | Consumer                         | 121 - com.devesion.examples.camel - 1.0.0.SNAPSHOT | message Message
2013-02-16 20:31:26,001 | INFO  | scheduler-1      | Producer                         | 121 - com.devesion.examples.camel - 1.0.0.SNAPSHOT | sending message
2013-02-16 20:31:26,005 | INFO  | Consumer[queue1] | Consumer                         | 121 - com.devesion.examples.camel - 1.0.0.SNAPSHOT | message Message
2013-02-16 20:31:27,000 | INFO  | scheduler-1      | Producer                         | 121 - com.devesion.examples.camel - 1.0.0.SNAPSHOT | sending message
2013-02-16 20:31:27,005 | INFO  | Consumer[queue1] | Consumer                         | 121 - com.devesion.examples.camel - 1.0.0.SNAPSHOT | message Message

Podsumowanie

Jak widzimy stworzenie messagingu w oparciu o Camela wymaga wykonania kilku kroków, które mogą wydawać się dość skomplikowane dla osób rozpoczynających pracę z tym frameworkiem. Dla ułatwienia pracy źródła przykładowego projektu umieściłem na githubie, może komuś się przydadzą ;) Zapraszam do korzystania.

Comments